feat(store): Return a clearer exception when a triplestore read timeo…
…ut occurs. (#1795)
Benjamin Geer committed Jan 26, 2021
1 parent 69ae7fd commit 0eeb3b3
Showing 5 changed files with 117 additions and 8 deletions.
14 changes: 14 additions & 0 deletions webapi/src/main/scala/org/knora/webapi/exceptions/Exceptions.scala
Expand Up @@ -299,6 +299,20 @@ object TriplestoreConnectionException {
TriplestoreConnectionException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))

* Indicates that a read timeout occurred while waiting for data from the triplestore.
* @param message a description of the error.
* @param cause the original exception representing the cause of the error, if any.
case class TriplestoreTimeoutException(message: String, cause: Option[Throwable] = None)
extends TriplestoreException(message, cause)

object TriplestoreTimeoutException {
def apply(message: String, e: Throwable, log: LoggingAdapter): TriplestoreTimeoutException =
TriplestoreTimeoutException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))

* Indicates that we tried using a feature which is unsuported by the selected triplestore.
Expand Up @@ -355,6 +355,11 @@ case class CheckTriplestoreRequest() extends TriplestoreRequest
case class CheckTriplestoreResponse(triplestoreStatus: TriplestoreStatus, msg: String)

* Simulates a triplestore timeout. Used only in testing.
case class SimulateTimeoutRequest() extends TriplestoreRequest

* Requests that the repository is updated to be compatible with the running version of Knora.
Expand Up @@ -231,18 +231,31 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
case UploadRepositoryRequest(inputFile: Path) => try2Message(sender(), uploadRepository(inputFile), log)
case InsertGraphDataContentRequest(graphContent: String, graphName: String) =>
try2Message(sender(), insertDataGraphRequest(graphContent, graphName), log)
case SimulateTimeoutRequest() => try2Message(sender(), doSimulateTimeout(), log)
case other =>
sender ! Status.Failure(
UnexpectedMessageException(s"Unexpected message $other of type ${other.getClass.getCanonicalName}"))

* Simulates a read timeout.
private def doSimulateTimeout(): Try[SparqlSelectResult] = {
val sparql = """SELECT ?foo WHERE {
| BIND("foo" AS ?foo)

sparqlHttpSelect(sparql = sparql, simulateTimeout = true)

* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return a [[SparqlSelectResult]].
private def sparqlHttpSelect(sparql: String): Try[SparqlSelectResult] = {
private def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): Try[SparqlSelectResult] = {
def parseJsonResponse(sparql: String, resultStr: String): Try[SparqlSelectResult] = {
val parseTry = Try {
Expand All @@ -265,7 +278,7 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
} else {
// No: get the response from the real triplestore over HTTP.
getSparqlHttpResponse(sparql, isUpdate = false)
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout)

// Are we preparing a fake triplestore?
Expand Down Expand Up @@ -833,11 +846,13 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param sparql the SPARQL request to be submitted.
* @param isUpdate `true` if this is an update request.
* @param acceptMimeType the MIME type to be provided in the HTTP Accept header.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return the triplestore's response.
private def getSparqlHttpResponse(sparql: String,
isUpdate: Boolean,
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson): Try[String] = {
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson,
simulateTimeout: Boolean = false): Try[String] = {

val httpContext: HttpClientContext = makeHttpContext

Expand Down Expand Up @@ -869,7 +884,8 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
client = httpClient,
request = httpPost,
context = httpContext,
processResponse = returnResponseAsString
processResponse = returnResponseAsString,
simulateTimeout = simulateTimeout

Expand Down Expand Up @@ -990,18 +1006,24 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param request the request to be sent.
* @param context the request context to be used.
* @param processResponse a function that processes the HTTP response.
* @param simulateTimeout if `true`, simulate a read timeout.
* @tparam T the return type of `processResponse`.
* @return the return value of `processResponse`.
private def doHttpRequest[T](client: CloseableHttpClient,
request: HttpRequest,
context: HttpClientContext,
processResponse: CloseableHttpResponse => T): Try[T] = {
processResponse: CloseableHttpResponse => T,
simulateTimeout: Boolean = false): Try[T] = {
// Make an Option wrapper for the response, so we can close it if we get one,
// even if an error occurs.
var maybeResponse: Option[CloseableHttpResponse] = None

val triplestoreResponseTry = Try {
if (simulateTimeout) {
throw new"Simulated read timeout")

val start = System.currentTimeMillis()
val response = client.execute(targetHost, request, context)
maybeResponse = Some(response)
Expand All @@ -1028,15 +1050,21 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat


// TODO: Can we throw a more user-friendly exception if the query timed out?
// TODO: Can we make Fuseki abandon the query if it takes too long?

triplestoreResponseTry.recover {
case tre: TriplestoreResponseException => throw tre

case socketTimeoutException: =>
val message =
"The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help."
log.error(socketTimeoutException, message)
throw TriplestoreTimeoutException(message = message, e = socketTimeoutException, log = log)

case e: Exception =>
log.error(e, s"Failed to connect to triplestore")
throw TriplestoreConnectionException(s"Failed to connect to triplestore", e, log)
val message = "Failed to connect to triplestore"
log.error(e, message)
throw TriplestoreConnectionException(message = message, e = e, log = log)

Expand Up @@ -21,6 +21,24 @@ scala_test(

name = "HttpTriplestoreConnectorSpec",
size = "small",
srcs = [
data = [
jvm_flags = ["-Dconfig.resource=fuseki.conf"],
# unused_dependency_checker_mode = "warn",

name = "GraphDBConsistencyCheckingSpec",
size = "small", # 60s
@@ -0,0 +1,44 @@
* Copyright © 2015-2021 the contributors (see
* This file is part of Knora.
* Knora is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* Knora is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU Affero General Public License for more details.
* You should have received a copy of the GNU Affero General Public
* License along with Knora. If not, see <>.


import akka.testkit.ImplicitSender
import org.knora.webapi.CoreSpec
import org.knora.webapi.exceptions.TriplestoreTimeoutException

import scala.concurrent.duration._

class HttpTriplestoreConnectorSpec extends CoreSpec() with ImplicitSender {
private val timeout = 10.seconds

"The HttpTriplestoreConnector" should {
"report a connection timeout with an appropriate error message" in {
storeManager ! SimulateTimeoutRequest()

expectMsgPF(timeout) {
case msg: =>
msg.cause.getMessage == "The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help.")

