Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Return a clearer exception when a triplestore read timeout occurs (DSP-1242) #1795

Merged
merged 1 commit into from Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -299,6 +299,20 @@ object TriplestoreConnectionException {
TriplestoreConnectionException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))
}

/**
* Indicates that a read timeout occurred while waiting for data from the triplestore.
*
* @param message a description of the error.
* @param cause the original exception representing the cause of the error, if any.
*/
case class TriplestoreTimeoutException(message: String, cause: Option[Throwable] = None)
extends TriplestoreException(message, cause)

object TriplestoreTimeoutException {
def apply(message: String, e: Throwable, log: LoggingAdapter): TriplestoreTimeoutException =
TriplestoreTimeoutException(message, Some(ExceptionUtil.logAndWrapIfNotSerializable(e, log)))
}

/**
* Indicates that we tried using a feature which is unsuported by the selected triplestore.
*
Expand Down
Expand Up @@ -355,6 +355,11 @@ case class CheckTriplestoreRequest() extends TriplestoreRequest
*/
case class CheckTriplestoreResponse(triplestoreStatus: TriplestoreStatus, msg: String)

/**
* Simulates a triplestore timeout. Used only in testing.
*/
case class SimulateTimeoutRequest() extends TriplestoreRequest

/**
* Requests that the repository is updated to be compatible with the running version of Knora.
*/
Expand Down
Expand Up @@ -231,18 +231,31 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
case UploadRepositoryRequest(inputFile: Path) => try2Message(sender(), uploadRepository(inputFile), log)
case InsertGraphDataContentRequest(graphContent: String, graphName: String) =>
try2Message(sender(), insertDataGraphRequest(graphContent, graphName), log)
case SimulateTimeoutRequest() => try2Message(sender(), doSimulateTimeout(), log)
case other =>
sender ! Status.Failure(
UnexpectedMessageException(s"Unexpected message $other of type ${other.getClass.getCanonicalName}"))
}

/**
* Simulates a read timeout.
*/
private def doSimulateTimeout(): Try[SparqlSelectResult] = {
val sparql = """SELECT ?foo WHERE {
| BIND("foo" AS ?foo)
|}""".stripMargin

sparqlHttpSelect(sparql = sparql, simulateTimeout = true)
}

/**
* Given a SPARQL SELECT query string, runs the query, returning the result as a [[SparqlSelectResult]].
*
* @param sparql the SPARQL SELECT query string.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return a [[SparqlSelectResult]].
*/
private def sparqlHttpSelect(sparql: String): Try[SparqlSelectResult] = {
private def sparqlHttpSelect(sparql: String, simulateTimeout: Boolean = false): Try[SparqlSelectResult] = {
def parseJsonResponse(sparql: String, resultStr: String): Try[SparqlSelectResult] = {
val parseTry = Try {
resultStr.parseJson.convertTo[SparqlSelectResult]
Expand All @@ -265,7 +278,7 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
Try(FakeTriplestore.data(sparql))
} else {
// No: get the response from the real triplestore over HTTP.
getSparqlHttpResponse(sparql, isUpdate = false)
getSparqlHttpResponse(sparql, isUpdate = false, simulateTimeout = simulateTimeout)
}

// Are we preparing a fake triplestore?
Expand Down Expand Up @@ -833,11 +846,13 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param sparql the SPARQL request to be submitted.
* @param isUpdate `true` if this is an update request.
* @param acceptMimeType the MIME type to be provided in the HTTP Accept header.
* @param simulateTimeout if `true`, simulate a read timeout.
* @return the triplestore's response.
*/
private def getSparqlHttpResponse(sparql: String,
isUpdate: Boolean,
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson): Try[String] = {
acceptMimeType: String = mimeTypeApplicationSparqlResultsJson,
simulateTimeout: Boolean = false): Try[String] = {

val httpContext: HttpClientContext = makeHttpContext

Expand Down Expand Up @@ -869,7 +884,8 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
client = httpClient,
request = httpPost,
context = httpContext,
processResponse = returnResponseAsString
processResponse = returnResponseAsString,
simulateTimeout = simulateTimeout
)
}

Expand Down Expand Up @@ -990,18 +1006,24 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat
* @param request the request to be sent.
* @param context the request context to be used.
* @param processResponse a function that processes the HTTP response.
* @param simulateTimeout if `true`, simulate a read timeout.
* @tparam T the return type of `processResponse`.
* @return the return value of `processResponse`.
*/
private def doHttpRequest[T](client: CloseableHttpClient,
request: HttpRequest,
context: HttpClientContext,
processResponse: CloseableHttpResponse => T): Try[T] = {
processResponse: CloseableHttpResponse => T,
simulateTimeout: Boolean = false): Try[T] = {
// Make an Option wrapper for the response, so we can close it if we get one,
// even if an error occurs.
var maybeResponse: Option[CloseableHttpResponse] = None

val triplestoreResponseTry = Try {
if (simulateTimeout) {
throw new java.net.SocketTimeoutException("Simulated read timeout")
}

val start = System.currentTimeMillis()
val response = client.execute(targetHost, request, context)
maybeResponse = Some(response)
Expand All @@ -1028,15 +1050,21 @@ class HttpTriplestoreConnector extends Actor with ActorLogging with Instrumentat

maybeResponse.foreach(_.close)

// TODO: Can we throw a more user-friendly exception if the query timed out?
// TODO: Can we make Fuseki abandon the query if it takes too long?

triplestoreResponseTry.recover {
case tre: TriplestoreResponseException => throw tre

case socketTimeoutException: java.net.SocketTimeoutException =>
val message =
"The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help."
log.error(socketTimeoutException, message)
throw TriplestoreTimeoutException(message = message, e = socketTimeoutException, log = log)

case e: Exception =>
log.error(e, s"Failed to connect to triplestore")
throw TriplestoreConnectionException(s"Failed to connect to triplestore", e, log)
val message = "Failed to connect to triplestore"
log.error(e, message)
throw TriplestoreConnectionException(message = message, e = e, log = log)
}
}

Expand Down
Expand Up @@ -21,6 +21,24 @@ scala_test(
] + BASE_TEST_DEPENDENCIES,
)

scala_test(
name = "HttpTriplestoreConnectorSpec",
size = "small",
srcs = [
"HttpTriplestoreConnectorSpec.scala",
],
data = [
"//knora-ontologies",
"//test_data",
],
jvm_flags = ["-Dconfig.resource=fuseki.conf"],
# unused_dependency_checker_mode = "warn",
deps = ALL_WEBAPI_MAIN_DEPENDENCIES + [
"//webapi:main_library",
"//webapi:test_library",
] + BASE_TEST_DEPENDENCIES,
)

scala_test(
name = "GraphDBConsistencyCheckingSpec",
size = "small", # 60s
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright © 2015-2021 the contributors (see Contributors.md).
*
* This file is part of Knora.
*
* Knora is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Knora is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with Knora. If not, see <http://www.gnu.org/licenses/>.
*/

package org.knora.webapi.store.triplestore

import akka.testkit.ImplicitSender
import org.knora.webapi.CoreSpec
import org.knora.webapi.exceptions.TriplestoreTimeoutException
import org.knora.webapi.messages.store.triplestoremessages.SimulateTimeoutRequest

import scala.concurrent.duration._

class HttpTriplestoreConnectorSpec extends CoreSpec() with ImplicitSender {
private val timeout = 10.seconds

"The HttpTriplestoreConnector" should {
"report a connection timeout with an appropriate error message" in {
storeManager ! SimulateTimeoutRequest()

expectMsgPF(timeout) {
case msg: akka.actor.Status.Failure =>
assert(msg.cause.isInstanceOf[TriplestoreTimeoutException])
assert(
msg.cause.getMessage == "The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help.")
}
}
}
}