Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fuseki doesn't stop after client's timeout (DEV-1190) #2175

Merged
merged 15 commits into from Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -278,7 +278,7 @@ init-db-test-from-staging: db_staging_dump.trig init-db-test-empty ## init local
init-db-test-from-prod: db_prod_dump.trig init-db-test-empty ## init local database with data from production
@echo $@
@curl -X POST -H "Content-Type: application/sparql-update" -d "DROP ALL" -u "admin:test" "http://localhost:3030/knora-test"
@curl -X POST -H "Content-Type: application/trig" --data-binary "@${CURRENT_DIR}/db_prod_dump.trig" -u "admin:test" "http://localhost:3030/knora-test"
@curl -X POST -H "Content-Type: application/trig" -T "${CURRENT_DIR}/db_prod_dump.trig" -u "admin:test" "http://localhost:3030/knora-test"

.PHONY: init-db-test-from-ls-test-server
init-db-test-from-ls-test-server: db_ls_test_server_dump.trig init-db-test-from-ls-test-server-trig-file ## init local database with data from ls-test-server
Expand Down
6 changes: 3 additions & 3 deletions webapi/src/main/resources/application.conf
Expand Up @@ -242,7 +242,7 @@ app {
print-extended-config = ${?KNORA_WEBAPI_PRINT_EXTENDED_CONFIG}

// default ask timeout. can be same or lower then akka.http.server.request-timeout.
default-timeout = 120 minutes
default-timeout = 120 minutes // a timeout here should never happen

// If true, log all messages sent from and received by routes. Since messages are logged at DEBUG level, you will
// need to set loglevel = "DEBUG" in the akka section of this file, and <root level="DEBUG"> in logback.xml.
Expand Down Expand Up @@ -457,8 +457,8 @@ app {
// timeout for triplestore queries. can be same or lower then akka.http.server.request-timeout.
query-timeout = 20 seconds

// timeout for tripelstore updates. can be same or lower then akka.http.server.request-timeout.
update-timeout = 120 minutes
// timeout for Gravsearch queries. can be same or lower then akka.http.server.request-timeout.
gravsearch-timeout = 120 seconds

// triplestore auto init. initialize triplestore at startup if necessary.
auto-init = false
Expand Down
Expand Up @@ -88,13 +88,13 @@ final case class Triplestore(
useHttps: Boolean,
host: String,
queryTimeout: String,
updateTimeout: String,
gravsearchTimeout: String,
autoInit: Boolean,
profileQueries: Boolean,
fuseki: Fuseki
) {
val queryTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(queryTimeout))
val updateTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(updateTimeout))
val queryTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(queryTimeout))
val gravsearchTimeoutAsDuration = zio.Duration.fromScala(scala.concurrent.duration.Duration(gravsearchTimeout))
}

/**
Expand Down
Expand Up @@ -31,11 +31,6 @@ import org.knora.webapi.messages.util.rdf._

sealed trait TriplestoreRequest extends StoreRequest

/**
* Simple message for initial actor functionality.
*/
case class HelloTriplestore(txt: String) extends TriplestoreRequest

/**
* Simple message for checking the connection to the triplestore.
*/
Expand All @@ -51,7 +46,7 @@ case class CheckConnectionACK()
*
* @param sparql the SPARQL string.
*/
case class SparqlSelectRequest(sparql: String) extends TriplestoreRequest
case class SparqlSelectRequest(sparql: String, isGravsearch: Boolean = false) extends TriplestoreRequest

/**
* Represents a SPARQL CONSTRUCT query to be sent to the triplestore. A successful response will be a
Expand Down Expand Up @@ -90,7 +85,7 @@ case class SparqlConstructResponse(statements: Map[IRI, Seq[(IRI, String)]])
*
* @param sparql the SPARQL string.
*/
case class SparqlExtendedConstructRequest(sparql: String) extends TriplestoreRequest
case class SparqlExtendedConstructRequest(sparql: String, isGravsearch: Boolean = false) extends TriplestoreRequest

/**
* Parses Turtle documents and converts them to [[SparqlExtendedConstructResponse]] objects.
Expand Down
Expand Up @@ -446,7 +446,7 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand

countResponse: SparqlSelectResult <-
appActor
.ask(SparqlSelectRequest(triplestoreSpecificCountQuery.toSparql))
.ask(SparqlSelectRequest(triplestoreSpecificCountQuery.toSparql, isGravsearch = true))
.mapTo[SparqlSelectResult]

// query response should contain one result with one row with the name "count"
Expand Down Expand Up @@ -543,8 +543,9 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand
triplestoreSpecificPrequerySparql = triplestoreSpecificPrequery.toSparql
_ = log.debug(triplestoreSpecificPrequerySparql)

start = System.currentTimeMillis()
tryPrequeryResponseNotMerged = Try(appActor.ask(SparqlSelectRequest(triplestoreSpecificPrequerySparql)))
start = System.currentTimeMillis()
tryPrequeryResponseNotMerged =
Try(appActor.ask(SparqlSelectRequest(triplestoreSpecificPrequerySparql, isGravsearch = true)))
prequeryResponseNotMerged <-
(tryPrequeryResponseNotMerged match {
case Failure(exception) => {
Expand Down Expand Up @@ -655,7 +656,8 @@ class SearchResponderV2(responderData: ResponderData) extends ResponderWithStand
appActor
.ask(
SparqlExtendedConstructRequest(
sparql = triplestoreSpecificMainQuerySparql
sparql = triplestoreSpecificMainQuerySparql,
isGravsearch = true
)
)
.mapTo[SparqlExtendedConstructResponse]
Expand Down
Expand Up @@ -1524,7 +1524,7 @@ class ResourcesRouteV1(routeData: KnoraRouteData) extends KnoraRoute(routeData)
settings = settings,
appActor = appActor,
log = log
)(timeout = settings.triplestoreUpdateTimeout, executionContext = executionContext)
)(timeout = settings.defaultTimeout, executionContext = executionContext)
}
}
} ~ path("v1" / "resources" / "xmlimportschemas" / Segment) { internalOntologyIri =>
Expand Down
Expand Up @@ -202,9 +202,6 @@ class KnoraSettingsImpl(config: Config, log: Logger) extends Extension {
val triplestoreType: String = config.getString("app.triplestore.dbtype")
val triplestoreHost: String = config.getString("app.triplestore.host")

val triplestoreQueryTimeout: FiniteDuration = getFiniteDuration("app.triplestore.query-timeout", config)
val triplestoreUpdateTimeout: FiniteDuration = getFiniteDuration("app.triplestore.update-timeout", config)

val triplestoreUseHttps: Boolean = config.getBoolean("app.triplestore.use-https")

val triplestoreAutoInit: Boolean = config.getBoolean("app.triplestore.auto-init")
Expand Down
Expand Up @@ -46,10 +46,10 @@ final case class TriplestoreServiceManager(
updater: RepositoryUpdater
) {
def receive(message: TriplestoreRequest) = message match {
case UpdateRepositoryRequest() => updater.maybeUpdateRepository
case SparqlSelectRequest(sparql: String) => ts.sparqlHttpSelect(sparql)
case sparqlConstructRequest: SparqlConstructRequest =>
ts.sparqlHttpConstruct(sparqlConstructRequest)
case UpdateRepositoryRequest() => updater.maybeUpdateRepository
case SparqlSelectRequest(sparql: String, isGravsearch: Boolean) =>
ts.sparqlHttpSelect(sparql = sparql, isGravsearch = isGravsearch)
case sparqlConstructRequest: SparqlConstructRequest => ts.sparqlHttpConstruct(sparqlConstructRequest)
case sparqlExtendedConstructRequest: SparqlExtendedConstructRequest =>
ts.sparqlHttpExtendedConstruct(sparqlExtendedConstructRequest)
case SparqlConstructFileRequest(
Expand Down
Expand Up @@ -35,11 +35,16 @@ trait TriplestoreService {
/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @param isGravsearch if `true`, takes a long timeout because gravsearch queries can take a long time.
* @return a [[SparqlSelectResult]].
*/
def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): UIO[SparqlSelectResult]
def sparqlHttpSelect(
sparql: String,
simulateTimeout: Boolean = false,
isGravsearch: Boolean = false
): UIO[SparqlSelectResult]

/**
* Given a SPARQL CONSTRUCT query string, runs the query, returning the result as a [[SparqlConstructResponse]].
Expand Down
Expand Up @@ -82,51 +82,26 @@ case class TriplestoreServiceHttpConnectorImpl(
new UsernamePasswordCredentials(config.triplestore.fuseki.username, config.triplestore.fuseki.password)
)

// Reading data should be quick, except when it is not ;-)
private val queryTimeoutMillis = config.triplestore.queryTimeoutAsDuration.toMillis.toInt
// timeouts sent to Fuseki
private val queryTimeoutString = config.triplestore.queryTimeoutAsDuration.toSeconds.toInt.toString
private val gravsearchTimeoutString = config.triplestore.gravsearchTimeoutAsDuration.toSeconds.toInt.toString

// the client config used for queries to the triplestore. The timeout has to be larger than
// config.triplestore.queryTimeoutAsDuration and config.triplestore.gravsearchTimeoutAsDuration.
private val requestTimeoutMillis = 7200000 // 2 hours

private val queryRequestConfig = RequestConfig
.custom()
.setConnectTimeout(queryTimeoutMillis)
.setConnectionRequestTimeout(queryTimeoutMillis)
.setSocketTimeout(queryTimeoutMillis)
.setConnectTimeout(requestTimeoutMillis)
.setConnectionRequestTimeout(requestTimeoutMillis)
.setSocketTimeout(requestTimeoutMillis)
.build

private val queryHttpClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(queryRequestConfig)
.build

// Some updates could take a while.
private val updateTimeoutMillis = config.triplestore.updateTimeoutAsDuration.toMillis.toInt

private val updateTimeoutConfig = RequestConfig
.custom()
.setConnectTimeout(updateTimeoutMillis)
.setConnectionRequestTimeout(updateTimeoutMillis)
.setSocketTimeout(updateTimeoutMillis)
.build

private val updateHttpClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(updateTimeoutConfig)
.build

// For updates that could take a very long time.
private val longUpdateTimeoutMillis = updateTimeoutMillis * 10

private val longRequestConfig = RequestConfig
.custom()
.setConnectTimeout(longUpdateTimeoutMillis)
.setConnectionRequestTimeout(longUpdateTimeoutMillis)
.setSocketTimeout(longUpdateTimeoutMillis)
.build

private val longRequestClient: CloseableHttpClient = HttpClients.custom
.setDefaultCredentialsProvider(credsProvider)
.setDefaultRequestConfig(longRequestConfig)
.build

private val dbName = config.triplestore.fuseki.repositoryName
private val queryPath = s"/${dbName}/query"
private val sparqlUpdatePath = s"/${dbName}/update"
Expand All @@ -153,12 +128,16 @@ case class TriplestoreServiceHttpConnectorImpl(
/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @param isGravsearch `true` if it is a gravsearch query (relevant for timeout)
* @return a [[SparqlSelectResult]].
*/
def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): UIO[SparqlSelectResult] = {

def sparqlHttpSelect(
sparql: String,
simulateTimeout: Boolean = false,
isGravsearch: Boolean = false
): UIO[SparqlSelectResult] = {
def parseJsonResponse(sparql: String, resultStr: String): IO[TriplestoreException, SparqlSelectResult] =
ZIO
.attemptBlocking(resultStr.parseJson.convertTo[SparqlSelectResult])
Expand All @@ -182,7 +161,7 @@ case class TriplestoreServiceHttpConnectorImpl(

for {
resultStr <-
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout)
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout, isGravsearch = isGravsearch)

// Parse the response as a JSON object and generate a response message.
responseMessage <- parseJsonResponse(sparql, resultStr).orDie
Expand All @@ -196,7 +175,6 @@ case class TriplestoreServiceHttpConnectorImpl(
* @return a [[SparqlConstructResponse]]
*/
def sparqlHttpConstruct(sparqlConstructRequest: SparqlConstructRequest): UIO[SparqlConstructResponse] = {
// println(logDelimiter + sparql)

val rdfFormatUtil: RdfFormatUtil = RdfFeatureFactory.getRdfFormatUtil()

Expand Down Expand Up @@ -299,6 +277,7 @@ case class TriplestoreServiceHttpConnectorImpl(
turtleStr <- getSparqlHttpResponse(
sparqlExtendedConstructRequest.sparql,
isUpdate = false,
isGravsearch = sparqlExtendedConstructRequest.isGravsearch,
acceptMimeType = mimeTypeTextTurtle
)

Expand All @@ -322,7 +301,6 @@ case class TriplestoreServiceHttpConnectorImpl(
* @return a [[SparqlUpdateResponse]].
*/
def sparqlHttpUpdate(sparqlUpdate: String): UIO[SparqlUpdateResponse] =
// println(logDelimiter + sparqlUpdate)
for {
// Send the request to the triplestore.
_ <- getSparqlHttpResponse(sparqlUpdate, isUpdate = true)
Expand Down Expand Up @@ -453,7 +431,7 @@ case class TriplestoreServiceHttpConnectorImpl(
httpContext <- makeHttpContext.orDie
_ <- ZIO.foreachDiscard(request)(elem =>
doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = elem._1,
context = httpContext,
processResponse = elem._2
Expand Down Expand Up @@ -541,7 +519,7 @@ case class TriplestoreServiceHttpConnectorImpl(
req <- httpGet
ctx <- makeHttpContext
res <- doHttpRequest(
client = updateHttpClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnResponseAsString
Expand Down Expand Up @@ -579,7 +557,7 @@ case class TriplestoreServiceHttpConnectorImpl(
request <- httpPost.orDie
httpContext <- makeHttpContext.orDie
_ <- doHttpRequest(
client = updateHttpClient,
client = queryHttpClient,
request = request,
context = httpContext,
processResponse = returnUploadResponse
Expand Down Expand Up @@ -664,27 +642,24 @@ case class TriplestoreServiceHttpConnectorImpl(

/**
* Submits a SPARQL request to the triplestore and returns the response as a string.
* According to the request type a different timeout are used.
*
* @param sparql the SPARQL request to be submitted.
* @param isUpdate `true` if this is an update request.
* @param isGravsearch `true` if this is a Gravsearch query (needs a greater timeout).
* @param acceptMimeType the MIME type to be provided in the HTTP Accept header.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return the triplestore's response.
*/
private def getSparqlHttpResponse(
sparql: String,
isUpdate: Boolean,
isGravsearch: Boolean = false,
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson,
simulateTimeout: Boolean = false
): UIO[String] = {

val httpClient = ZIO.attempt {
if (isUpdate) {
updateHttpClient
} else {
queryHttpClient
}
}
val httpClient = ZIO.attempt(queryHttpClient)

val httpPost = ZIO.attempt {
if (isUpdate) {
Expand All @@ -694,10 +669,13 @@ case class TriplestoreServiceHttpConnectorImpl(
updateHttpPost.setEntity(requestEntity)
updateHttpPost
} else {
// Send queries as application/x-www-form-urlencoded (as per SPARQL 1.1 Protocol §2.1.2,
// "query via POST with URL-encoded parameters"), so we can include the "infer" parameter when using GraphDB.
// Send queries as application/x-www-form-urlencoded (as per SPARQL 1.1 Protocol §2.1.2, "query via POST with URL-encoded parameters").
val formParams = new util.ArrayList[NameValuePair]()
formParams.add(new BasicNameValuePair("query", sparql))
// in case of a gravsearch query, a specific (longer) timeout is set
formParams.add(
new BasicNameValuePair("timeout", (if (isGravsearch) gravsearchTimeoutString else queryTimeoutString))
)
val requestEntity: UrlEncodedFormEntity = new UrlEncodedFormEntity(formParams, Consts.UTF_8)
val queryHttpPost: HttpPost = new HttpPost(queryPath)
queryHttpPost.setEntity(requestEntity)
Expand Down Expand Up @@ -741,7 +719,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpGet.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = writeResponseFileAsPlainContent(outputFile)
Expand All @@ -767,7 +745,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpPost.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnUploadResponse
Expand Down Expand Up @@ -796,7 +774,7 @@ case class TriplestoreServiceHttpConnectorImpl(
ctx <- makeHttpContext.orDie
req <- httpPut.orDie
res <- doHttpRequest(
client = longRequestClient,
client = queryHttpClient,
request = req,
context = ctx,
processResponse = returnInsertGraphDataResponse(graphName)
Expand Down Expand Up @@ -840,8 +818,6 @@ case class TriplestoreServiceHttpConnectorImpl(
simulateTimeout: Boolean = false
): UIO[T] = {

// TODO: Can we make Fuseki abandon the query if it takes too long?

def checkSimulateTimeout(): UIO[Unit] =
if (simulateTimeout) {
ZIO.die(
Expand Down
Expand Up @@ -56,8 +56,7 @@ import org.knora.webapi.util._
class ResourcesRouteV2E2ESpec extends E2ESpec(ResourcesRouteV2E2ESpec.config) {
private implicit val stringFormatter: StringFormatter = StringFormatter.getGeneralInstance

implicit def default(implicit system: ActorSystem): RouteTestTimeout =
RouteTestTimeout(settings.triplestoreUpdateTimeout)
implicit def default(implicit system: ActorSystem): RouteTestTimeout = RouteTestTimeout(settings.defaultTimeout)

implicit val ec: ExecutionContextExecutor = system.dispatcher

Expand Down