Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: Fuseki doesn't stop after client's timeout (DEV-1190) (#2175)
* fix warning

* remove unused class

* improve make command to load big dataset

* introduce separate timeouts for gravsearch and other searches

* Update application.conf

* remove unused import

* remove unused timeout

* keep only one request config

* fix typo

Co-authored-by: Ivan Subotic <400790+subotic@users.noreply.github.com>
  • Loading branch information
irinaschubert and subotic committed Aug 26, 2022
1 parent 5d4e57e commit 90f86b5
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -278,7 +278,7 @@ init-db-test-from-staging: db_staging_dump.trig init-db-test-empty ## init local
init-db-test-from-prod: db_prod_dump.trig init-db-test-empty ## init local database with data from production
@echo $@
@curl -X POST -H "Content-Type: application/sparql-update" -d "DROP ALL" -u "admin:test" "http://localhost:3030/knora-test"
@curl -X POST -H "Content-Type: application/trig" --data-binary "@${CURRENT_DIR}/db_prod_dump.trig" -u "admin:test" "http://localhost:3030/knora-test"
@curl -X POST -H "Content-Type: application/trig" -T "${CURRENT_DIR}/db_prod_dump.trig" -u "admin:test" "http://localhost:3030/knora-test"

.PHONY: init-db-test-from-ls-test-server
init-db-test-from-ls-test-server: db_ls_test_server_dump.trig init-db-test-from-ls-test-server-trig-file ## init local database with data from ls-test-server
Expand Down
6 changes: 3 additions & 3 deletions webapi/src/main/resources/application.conf
Expand Up @@ -242,7 +242,7 @@ app {
print-extended-config = ${?KNORA_WEBAPI_PRINT_EXTENDED_CONFIG}

// default ask timeout. can be same or lower then akka.http.server.request-timeout.
default-timeout = 120 minutes
default-timeout = 120 minutes // a timeout here should never happen

// If true, log all messages sent from and received by routes. Since messages are logged at DEBUG level, you will
// need to set loglevel = "DEBUG" in the akka section of this file, and <root level="DEBUG"> in logback.xml.
Expand Down Expand Up @@ -457,8 +457,8 @@ app {
// timeout for triplestore queries. can be same or lower then akka.http.server.request-timeout.
query-timeout = 20 seconds

// timeout for tripelstore updates. can be same or lower then akka.http.server.request-timeout.
update-timeout = 120 minutes
// timeout for Gravsearch queries. can be same or lower then akka.http.server.request-timeout.
gravsearch-timeout = 120 seconds

// triplestore auto init. initialize triplestore at startup if necessary.
auto-init = false
Expand Down
6 changes: 3 additions & 3 deletions webapi/src/main/scala/org/knora/webapi/config/AppConfig.scala
Expand Up @@ -88,13 +88,13 @@ final case class Triplestore(
useHttps: Boolean,
host: String,
queryTimeout: String,
updateTimeout: String,
gravsearchTimeout: String,
autoInit: Boolean,
profileQueries: Boolean,
fuseki: Fuseki
) {
val queryTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(queryTimeout))
val updateTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(updateTimeout))
val queryTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(queryTimeout))
val gravsearchTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(gravsearchTimeout))
}

/**
Expand Down
Expand Up @@ -31,11 +31,6 @@ import org.knora.webapi.messages.util.rdf._

sealed trait TriplestoreRequest extends StoreRequest

/**
* Simple message for initial actor functionality.
*/
case class HelloTriplestore(txt: String) extends TriplestoreRequest

/**
* Simple message for checking the connection to the triplestore.
*/
Expand All @@ -51,7 +46,7 @@ case class CheckConnectionACK()
*
* @param sparql the SPARQL string.
*/
case class SparqlSelectRequest(sparql: String) extends TriplestoreRequest
case class SparqlSelectRequest(sparql: String, isGravsearch: Boolean = false) extends TriplestoreRequest

/**
* Represents a SPARQL CONSTRUCT query to be sent to the triplestore. A successful response will be a
Expand Down Expand Up @@ -90,7 +85,7 @@ case class SparqlConstructResponse(statements: Map[IRI, Seq[(IRI, String)]])
*
* @param sparql the SPARQL string.
*/
case class SparqlExtendedConstructRequest(sparql: String) extends TriplestoreRequest
case class SparqlExtendedConstructRequest(sparql: String, isGravsearch: Boolean = false) extends TriplestoreRequest

/**
* Parses Turtle documents and converts them to [[SparqlExtendedConstructResponse]] objects.
Expand Down
Expand Up @@ -446,7 +446,7 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand

countResponse: SparqlSelectResult <-
appActor
.ask(SparqlSelectRequest(triplestoreSpecificCountQuery.toSparql))
.ask(SparqlSelectRequest(triplestoreSpecificCountQuery.toSparql, isGravsearch = true))
.mapTo[SparqlSelectResult]

// query response should contain one result with one row with the name "count"
Expand Down Expand Up @@ -543,8 +543,9 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand
triplestoreSpecificPrequerySparql = triplestoreSpecificPrequery.toSparql
_ = log.debug(triplestoreSpecificPrequerySparql)

start = System.currentTimeMillis()
tryPrequeryResponseNotMerged = Try(appActor.ask(SparqlSelectRequest(triplestoreSpecificPrequerySparql)))
start = System.currentTimeMillis()
tryPrequeryResponseNotMerged =
Try(appActor.ask(SparqlSelectRequest(triplestoreSpecificPrequerySparql, isGravsearch = true)))
prequeryResponseNotMerged <-
(tryPrequeryResponseNotMerged match {
case Failure(exception) => {
Expand Down Expand Up @@ -655,7 +656,8 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand
appActor
.ask(
SparqlExtendedConstructRequest(
sparql = triplestoreSpecificMainQuerySparql
sparql = triplestoreSpecificMainQuerySparql,
isGravsearch = true
)
)
.mapTo[SparqlExtendedConstructResponse]
Expand Down
Expand Up @@ -1524,7 +1524,7 @@ class ResourcesRouteV1(routeData: KnoraRouteData) extends KnoraRoute(routeData)
settings = settings,
appActor = appActor,
log = log
)(timeout = settings.triplestoreUpdateTimeout, executionContext = executionContext)
)(timeout = settings.defaultTimeout, executionContext = executionContext)
}
}
} ~ path("v1" / "resources" / "xmlimportschemas" / Segment) { internalOntologyIri =>
Expand Down
Expand Up @@ -202,9 +202,6 @@ class KnoraSettingsImpl(config: Config, log: Logger) extends Extension {
val triplestoreType: String = config.getString("app.triplestore.dbtype")
val triplestoreHost: String = config.getString("app.triplestore.host")

val triplestoreQueryTimeout: FiniteDuration = getFiniteDuration("app.triplestore.query-timeout", config)
val triplestoreUpdateTimeout: FiniteDuration = getFiniteDuration("app.triplestore.update-timeout", config)

val triplestoreUseHttps: Boolean = config.getBoolean("app.triplestore.use-https")

val triplestoreAutoInit: Boolean = config.getBoolean("app.triplestore.auto-init")
Expand Down
Expand Up @@ -46,10 +46,10 @@ final case class TriplestoreServiceManager(
updater: RepositoryUpdater
) {
def receive(message: TriplestoreRequest) = message match {
case UpdateRepositoryRequest() => updater.maybeUpdateRepository
case SparqlSelectRequest(sparql: String) => ts.sparqlHttpSelect(sparql)
case sparqlConstructRequest: SparqlConstructRequest =>
ts.sparqlHttpConstruct(sparqlConstructRequest)
case UpdateRepositoryRequest() => updater.maybeUpdateRepository
case SparqlSelectRequest(sparql: String, isGravsearch: Boolean) =>
ts.sparqlHttpSelect(sparql = sparql, isGravsearch = isGravsearch)
case sparqlConstructRequest: SparqlConstructRequest => ts.sparqlHttpConstruct(sparqlConstructRequest)
case sparqlExtendedConstructRequest: SparqlExtendedConstructRequest =>
ts.sparqlHttpExtendedConstruct(sparqlExtendedConstructRequest)
case SparqlConstructFileRequest(
Expand Down
Expand Up @@ -35,11 +35,16 @@ trait TriplestoreService {
/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @param isGravsearch if `true`, takes a long timeout because gravsearch queries can take a long time.
* @return a [[SparqlSelectResult]].
*/
def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): UIO[SparqlSelectResult]
def sparqlHttpSelect(
sparql: String,
simulateTimeout: Boolean = false,
isGravsearch: Boolean = false
): UIO[SparqlSelectResult]

/**
* Given a SPARQL CONSTRUCT query string, runs the query, returning the result as a [[SparqlConstructResponse]].
Expand Down
Expand Up @@ -82,51 +82,26 @@ case class TriplestoreServiceHttpConnectorImpl(
new UsernamePasswordCredentials(config.triplestore.fuseki.username, config.triplestore.fuseki.password)
)

// Reading data should be quick, except when it is not ;-)
private val queryTimeoutMillis = config.triplestore.queryTimeoutAsDuration.toMillis.toInt
// timeouts sent to Fuseki
private val queryTimeoutString = config.triplestore.queryTimeoutAsDuration.toSeconds.toInt.toString
private val gravsearchTimeoutString = config.triplestore.gravsearchTimeoutAsDuration.toSeconds.toInt.toString

// the client config used for queries to the triplestore. The timeout has to be larger than
// config.triplestore.queryTimeoutAsDuration and config.triplestore.gravsearchTimeoutAsDuration.
private val requestTimeoutMillis = 7200000 // 2 hours

private val queryRequestConfig = RequestConfig
.custom()
.setConnectTimeout(queryTimeoutMillis)
.setConnectionRequestTimeout(queryTimeoutMillis)
.setSocketTimeout(queryTimeoutMillis)
.setConnectTimeout(requestTimeoutMillis)
.setConnectionRequestTimeout(requestTimeoutMillis)
.setSocketTimeout(requestTimeoutMillis)
.build

private val queryHttpClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(queryRequestConfig)
.build

// Some updates could take a while.
private val updateTimeoutMillis = config.triplestore.updateTimeoutAsDuration.toMillis.toInt

private val updateTimeoutConfig = RequestConfig
.custom()
.setConnectTimeout(updateTimeoutMillis)
.setConnectionRequestTimeout(updateTimeoutMillis)
.setSocketTimeout(updateTimeoutMillis)
.build

private val updateHttpClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(updateTimeoutConfig)
.build

// For updates that could take a very long time.
private val longUpdateTimeoutMillis = updateTimeoutMillis * 10

private val longRequestConfig = RequestConfig
.custom()
.setConnectTimeout(longUpdateTimeoutMillis)
.setConnectionRequestTimeout(longUpdateTimeoutMillis)
.setSocketTimeout(longUpdateTimeoutMillis)
.build

private val longRequestClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(longRequestConfig)
.build

private val dbName = config.triplestore.fuseki.repositoryName
private val queryPath = s"/${dbName}/query"
private val sparqlUpdatePath = s"/${dbName}/update"
Expand All @@ -153,12 +128,16 @@ case class TriplestoreServiceHttpConnectorImpl(
/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @param isGravsearch `true` if it is a gravsearch query (relevant for timeout)
* @return a [[SparqlSelectResult]].
*/
def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): UIO[SparqlSelectResult] = {

def sparqlHttpSelect(
sparql: String,
simulateTimeout: Boolean = false,
isGravsearch: Boolean = false
): UIO[SparqlSelectResult] = {
def parseJsonResponse(sparql: String, resultStr: String): IO[TriplestoreException, SparqlSelectResult] =
ZIO
.attemptBlocking(resultStr.parseJson.convertTo[SparqlSelectResult])
Expand All @@ -182,7 +161,7 @@ case class TriplestoreServiceHttpConnectorImpl(

for {
resultStr <-
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout)
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout, isGravsearch = isGravsearch)

// Parse the response as a JSON object and generate a response message.
responseMessage <- parseJsonResponse(sparql, resultStr).orDie
Expand All @@ -196,7 +175,6 @@ case class TriplestoreServiceHttpConnectorImpl(
* @return a [[SparqlConstructResponse]]
*/
def sparqlHttpConstruct(sparqlConstructRequest: SparqlConstructRequest): UIO[SparqlConstructResponse] = {
// println(logDelimiter + sparql)

val rdfFormatUtil: RdfFormatUtil = RdfFeatureFactory.getRdfFormatUtil()

Expand Down Expand Up @@ -299,6 +277,7 @@ case class TriplestoreServiceHttpConnectorImpl(
turtleStr <- getSparqlHttpResponse(
sparqlExtendedConstructRequest.sparql,
isUpdate = false,
isGravsearch = sparqlExtendedConstructRequest.isGravsearch,
acceptMimeType = mimeTypeTextTurtle
)

Expand All @@ -322,7 +301,6 @@ case class TriplestoreServiceHttpConnectorImpl(
* @return a [[SparqlUpdateResponse]].
*/
def sparqlHttpUpdate(sparqlUpdate: String): UIO[SparqlUpdateResponse] =
// println(logDelimiter + sparqlUpdate)
for {
// Send the request to the triplestore.
_ <- getSparqlHttpResponse(sparqlUpdate, isUpdate = true)
Expand Down Expand Up @@ -453,7 +431,7 @@ case class TriplestoreServiceHttpConnectorImpl(
httpContext <- makeHttpContext.orDie
_ <- ZIO.foreachDiscard(request)(elem =>
doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = elem._1,
context = httpContext,
processResponse = elem._2
Expand Down Expand Up @@ -541,7 +519,7 @@ case class TriplestoreServiceHttpConnectorImpl(
req <- httpGet
ctx <- makeHttpContext
res <- doHttpRequest(
client = updateHttpClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnResponseAsString
Expand Down Expand Up @@ -579,7 +557,7 @@ case class TriplestoreServiceHttpConnectorImpl(
request <- httpPost.orDie
httpContext <- makeHttpContext.orDie
_ <- doHttpRequest(
client = updateHttpClient,
client = queryHttpClient,
request = request,
context = httpContext,
processResponse = returnUploadResponse
Expand Down Expand Up @@ -664,27 +642,24 @@ case class TriplestoreServiceHttpConnectorImpl(

/**
* Submits a SPARQL request to the triplestore and returns the response as a string.
* According to the request type a different timeout are used.
*
* @param sparql the SPARQL request to be submitted.
* @param isUpdate `true` if this is an update request.
* @param isGravsearch `true` if this is a Gravsearch query (needs a greater timeout).
* @param acceptMimeType the MIME type to be provided in the HTTP Accept header.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return the triplestore's response.
*/
private def getSparqlHttpResponse(
sparql: String,
isUpdate: Boolean,
isGravsearch: Boolean = false,
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson,
simulateTimeout: Boolean = false
): UIO[String] = {

val httpClient = ZIO.attempt {
if (isUpdate) {
updateHttpClient
} else {
queryHttpClient
}
}
val httpClient = ZIO.attempt(queryHttpClient)

val httpPost = ZIO.attempt {
if (isUpdate) {
Expand All @@ -694,10 +669,13 @@ case class TriplestoreServiceHttpConnectorImpl(
updateHttpPost.setEntity(requestEntity)
updateHttpPost
} else {
// Send queries as application/x-www-form-urlencoded (as per SPARQL 1.1 Protocol §2.1.2,
// "query via POST with URL-encoded parameters"), so we can include the "infer" parameter when using GraphDB.
// Send queries as application/x-www-form-urlencoded (as per SPARQL 1.1 Protocol §2.1.2, "query via POST with URL-encoded parameters").
val formParams = new util.ArrayList[NameValuePair]()
formParams.add(new BasicNameValuePair("query", sparql))
// in case of a gravsearch query, a specific (longer) timeout is set
formParams.add(
new BasicNameValuePair("timeout", (if (isGravsearch) gravsearchTimeoutString else queryTimeoutString))
)
val requestEntity: UrlEncodedFormEntity = new UrlEncodedFormEntity(formParams, Consts.UTF_8)
val queryHttpPost: HttpPost = new HttpPost(queryPath)
queryHttpPost.setEntity(requestEntity)
Expand Down Expand Up @@ -741,7 +719,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpGet.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = writeResponseFileAsPlainContent(outputFile)
Expand All @@ -767,7 +745,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpPost.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnUploadResponse
Expand Down Expand Up @@ -796,7 +774,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpPut.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnInsertGraphDataResponse(graphName)
Expand Down Expand Up @@ -840,8 +818,6 @@ case class TriplestoreServiceHttpConnectorImpl(
simulateTimeout: Boolean = false
): UIO[T] = {

// TODO: Can we make Fuseki abandon the query if it takes too long?

def checkSimulateTimeout(): UIO[Unit] =
if (simulateTimeout) {
ZIO.die(
Expand Down
Expand Up @@ -56,8 +56,7 @@ import org.knora.webapi.util._
class ResourcesRouteV2E2ESpec extends E2ESpec(ResourcesRouteV2E2ESpec.config) {
private implicit val stringFormatter: StringFormatter = StringFormatter.getGeneralInstance

implicit def default(implicit system: ActorSystem): RouteTestTimeout =
RouteTestTimeout(settings.triplestoreUpdateTimeout)
implicit def default(implicit system: ActorSystem): RouteTestTimeout = RouteTestTimeout(settings.defaultTimeout)

implicit val ec: ExecutionContextExecutor = system.dispatcher

Expand Down

0 comments on commit 90f86b5

Please sign in to comment.