Skip to content


fix: Ask timeouts when requesting projects (DEV-1386) (#2235)
Browse files Browse the repository at this point in the history
  • Loading branch information
irinaschubert committed Oct 6, 2022
1 parent 748f83b commit 1820367
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 150 deletions.
2 changes: 1 addition & 1 deletion webapi/src/main/resources/application.conf
Expand Up @@ -18,7 +18,7 @@ akka {
# SYN-packets and connection attempts may fail. Note, that the backlog
# size is usually only a maximum size hint for the OS and the OS can
# restrict the number further based on global limits.
backlog = 100
backlog = 1024

# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle connection timeouts.
Expand Down
Expand Up @@ -63,7 +63,7 @@ object AppRouter {
val populateOntologyCaches: UIO[Unit] = {

val request = LoadOntologiesRequestV2(requestingUser = KnoraSystemInstances.Users.SystemUser)
val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(3, scala.concurrent.duration.SECONDS))
val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(60, scala.concurrent.duration.SECONDS))

for {
response <- ZIO.fromFuture(_ => (ref.ask(request)(timeout)).mapTo[SuccessResponseV2]).orDie
Expand Down
Expand Up @@ -6,8 +6,8 @@

import akka.http.scaladsl.util.FastFuture
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.ExecutionContext
import scala.concurrent._
Expand Down Expand Up @@ -204,7 +204,6 @@ trait ConstructToSelectTransformer extends WhereTransformer {
object QueryTraverser {
private implicit val stringFormatter: StringFormatter = StringFormatter.getGeneralInstance
private implicit val timeout: Timeout = Duration(5, SECONDS)

* Helper method that analyzed an RDF Entity and returns a sequence of Ontology IRIs that are being referenced by the entity.
Expand All @@ -225,20 +224,21 @@ object QueryTraverser {
val maybeOntoIri = map.get(internal)
maybeOntoIri match {
// if the map contains an ontology IRI corresponding to the entity IRI, then this can be returned
case Some(iri) => Future(Seq(iri))
case Some(iri) => FastFuture.successful(Seq(iri))
case None => {
// if the map doesn't contain a corresponding ontology IRI, then the entity IRI points to a resource or value
// in that case, all ontologies of the project, to which the entity belongs, should be returned.
val shortcode = internal.getProjectCode
shortcode match {
case None => Future(Seq.empty)
case None => FastFuture.successful(Seq.empty)
case Some(_) => {
// find the project with the shortcode

for {
projectMaybe <- appActor
.ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode)))
projectMaybe <-
.ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode)))(Duration(100, SECONDS))
projectOntologies = projectMaybe match {
case None => Seq.empty
// return all ontologies of the project
Expand All @@ -250,7 +250,7 @@ object QueryTraverser {
case _ => Future(Seq.empty)
case _ => FastFuture.successful(Seq.empty)

Expand Down
Expand Up @@ -239,8 +239,6 @@ class ProjectsResponderADM(responderData: ResponderData) extends Responder(respo
log.debug("getSingleProjectADM - could not retrieve project: {}", identifier.value)

_ = appActor.ask(CacheServiceFlushDB(KnoraSystemInstances.Users.SystemUser))

} yield maybeProjectADM

Expand Down
Expand Up @@ -11,6 +11,7 @@ import zio.metrics.Metric

import java.time.temporal.ChronoUnit

import org.knora.webapi.config.AppConfig
import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectADM
import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectIdentifierADM
import org.knora.webapi.messages.admin.responder.usersmessages.UserADM
Expand All @@ -24,131 +25,134 @@ trait CacheServiceManager {

object CacheServiceManager {
val layer: ZLayer[CacheService, Nothing, CacheServiceManager] =

val layer: ZLayer[CacheService & AppConfig, Nothing, CacheServiceManager] =
ZLayer {
for {
cacheService <- ZIO.service[CacheService]
} yield new CacheServiceManager {

val cacheServiceWriteUserTimer = Metric
name = "cache-service-write-user",
chronoUnit = ChronoUnit.NANOS

val cacheServiceWriteProjectTimer = Metric
name = "cache-service-write-project",
chronoUnit = ChronoUnit.NANOS

val cacheServiceReadProjectTimer = Metric
name = "cache-service-read-project",
chronoUnit = ChronoUnit.NANOS

override def receive(msg: CacheServiceRequest) = msg match {
case CacheServicePutUserADM(value) => putUserADM(value)
case CacheServiceGetUserADM(identifier) => getUserADM(identifier)
case CacheServicePutProjectADM(value) => putProjectADM(value)
case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier)
case CacheServicePutString(key, value) => writeStringValue(key, value)
case CacheServiceGetString(key) => getStringValue(key)
case CacheServiceRemoveValues(keys) => removeValues(keys)
case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser)
case CacheServiceGetStatus => ping()
case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other")

* Stores the user under the IRI and additionally the IRI under the keys of
* IRI -> byte array
* username -> IRI
* email -> IRI
* @param value the stored value
def putUserADM(value: UserADM): Task[Unit] =
for {
res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration
// _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value))
} yield res

* Retrieves the user stored under the identifier (either iri, username,
* or email).
* @param id the project identifier.
def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] =

* Stores the project under the IRI and additionally the IRI under the keys
* IRI -> byte array
* shortname -> IRI
* shortcode -> IRI
* @param value the stored value
def putProjectADM(value: ProjectADM): Task[Unit] =
for {
res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration
// _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

* Retrieves the project stored under the identifier (either iri, shortname, or shortcode).
* @param identifier the project identifier.
def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] =
for {
res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration
// _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

* Get value stored under the key as a string.
* @param k the key.
def getStringValue(k: String): Task[Option[String]] =

* Store string or byte array value under key.
* @param k the key.
* @param v the value.
def writeStringValue(k: String, v: String): Task[Unit] =
cacheService.putStringValue(k, v)

* Removes values for the provided keys. Any invalid keys are ignored.
* @param keys the keys.
def removeValues(keys: Set[String]): Task[Unit] =

* Flushes (removes) all stored content from the store.
def flushDB(requestingUser: UserADM): Task[Unit] =

* Pings the cache service to see if it is available.
def ping(): UIO[CacheServiceStatusResponse] =
} yield CacheServiceManagerImpl(cacheService)

private final case class CacheServiceManagerImpl(cacheService: CacheService) extends CacheServiceManager {

val cacheServiceWriteUserTimer = Metric
name = "cache-service-write-user",
chronoUnit = ChronoUnit.NANOS

val cacheServiceWriteProjectTimer = Metric
name = "cache-service-write-project",
chronoUnit = ChronoUnit.NANOS

val cacheServiceReadProjectTimer = Metric
name = "cache-service-read-project",
chronoUnit = ChronoUnit.NANOS

override def receive(msg: CacheServiceRequest) = msg match {
case CacheServicePutUserADM(value) => putUserADM(value)
case CacheServiceGetUserADM(identifier) => getUserADM(identifier)
case CacheServicePutProjectADM(value) => putProjectADM(value)
case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier)
case CacheServicePutString(key, value) => writeStringValue(key, value)
case CacheServiceGetString(key) => getStringValue(key)
case CacheServiceRemoveValues(keys) => removeValues(keys)
case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser)
case CacheServiceGetStatus => ping()
case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other")

* Stores the user under the IRI and additionally the IRI under the keys of
* IRI -> byte array
* username -> IRI
* email -> IRI
* @param value the stored value
def putUserADM(value: UserADM): Task[Unit] =
for {
res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration
// _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value))
} yield res

* Retrieves the user stored under the identifier (either iri, username,
* or email).
* @param id the project identifier.
def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] =

* Stores the project under the IRI and additionally the IRI under the keys
* IRI -> byte array
* shortname -> IRI
* shortcode -> IRI
* @param value the stored value
def putProjectADM(value: ProjectADM): Task[Unit] =
for {
res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration
// _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

* Retrieves the project stored under the identifier (either iri, shortname, or shortcode).
* @param identifier the project identifier.
def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] =
for {
res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration
// _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

* Get value stored under the key as a string.
* @param k the key.
def getStringValue(k: String): Task[Option[String]] =

* Store string or byte array value under key.
* @param k the key.
* @param v the value.
def writeStringValue(k: String, v: String): Task[Unit] =
cacheService.putStringValue(k, v)

* Removes values for the provided keys. Any invalid keys are ignored.
* @param keys the keys.
def removeValues(keys: Set[String]): Task[Unit] =

* Flushes (removes) all stored content from the store.
def flushDB(requestingUser: UserADM): Task[Unit] =

* Pings the cache service to see if it is available.
def ping(): UIO[CacheServiceStatusResponse] =
Expand Up @@ -1034,7 +1034,7 @@ object TriplestoreServiceHttpConnectorImpl {
// Configure total max or per route limits for persistent connections
// that can be kept in the pool or leased by the connection manager.

// Sipi custom default request config
val defaultRequestConfig = RequestConfig
Expand Down
Expand Up @@ -60,20 +60,6 @@ class ProjectsResponderADMSpec extends CoreSpec with ImplicitSender {

/* Images project */
appActor ! ProjectGetRequestADM(
identifier = ProjectIdentifierADM(maybeIri = Some(,
requestingUser = SharedTestDataADM.rootUser

/* 'SystemProject' */
appActor ! ProjectGetRequestADM(
identifier = ProjectIdentifierADM(maybeIri = Some(,
requestingUser = SharedTestDataADM.rootUser


"return information about a project identified by shortname" in {
Expand Down

0 comments on commit 1820367

Please sign in to comment.