Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ask timeouts when requesting projects (DEV-1386) #2235

Merged
merged 9 commits into from Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion webapi/src/main/resources/application.conf
Expand Up @@ -18,7 +18,7 @@ akka {
# SYN-packets and connection attempts may fail. Note, that the backlog
# size is usually only a maximum size hint for the OS and the OS can
# restrict the number further based on global limits.
backlog = 100
backlog = 1024

# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle connection timeouts.
Expand Down
Expand Up @@ -63,7 +63,7 @@ object AppRouter {
val populateOntologyCaches: UIO[Unit] = {

val request = LoadOntologiesRequestV2(requestingUser = KnoraSystemInstances.Users.SystemUser)
val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(3, scala.concurrent.duration.SECONDS))
val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(60, scala.concurrent.duration.SECONDS))

for {
response <- ZIO.fromFuture(_ => (ref.ask(request)(timeout)).mapTo[SuccessResponseV2]).orDie
Expand Down
Expand Up @@ -6,8 +6,8 @@
package org.knora.webapi.messages.util.search

import akka.actor.ActorRef
import akka.http.scaladsl.util.FastFuture
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.ExecutionContext
import scala.concurrent._
Expand Down Expand Up @@ -204,7 +204,6 @@ trait ConstructToSelectTransformer extends WhereTransformer {
*/
object QueryTraverser {
private implicit val stringFormatter: StringFormatter = StringFormatter.getGeneralInstance
private implicit val timeout: Timeout = Duration(5, SECONDS)

/**
* Helper method that analyzed an RDF Entity and returns a sequence of Ontology IRIs that are being referenced by the entity.
Expand All @@ -225,20 +224,21 @@ object QueryTraverser {
val maybeOntoIri = map.get(internal)
maybeOntoIri match {
// if the map contains an ontology IRI corresponding to the entity IRI, then this can be returned
case Some(iri) => Future(Seq(iri))
case Some(iri) => FastFuture.successful(Seq(iri))
case None => {
// if the map doesn't contain a corresponding ontology IRI, then the entity IRI points to a resource or value
// in that case, all ontologies of the project, to which the entity belongs, should be returned.
val shortcode = internal.getProjectCode
shortcode match {
case None => Future(Seq.empty)
case None => FastFuture.successful(Seq.empty)
case Some(_) => {
// find the project with the shortcode

for {
projectMaybe <- appActor
.ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode)))
.mapTo[Option[ProjectADM]]
projectMaybe <-
appActor
.ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode)))(Duration(100, SECONDS))
.mapTo[Option[ProjectADM]]
projectOntologies = projectMaybe match {
case None => Seq.empty
// return all ontologies of the project
Expand All @@ -250,7 +250,7 @@ object QueryTraverser {
}
}
}
case _ => Future(Seq.empty)
case _ => FastFuture.successful(Seq.empty)
}

/**
Expand Down
Expand Up @@ -239,8 +239,6 @@ class ProjectsResponderADM(responderData: ResponderData) extends Responder(respo
log.debug("getSingleProjectADM - could not retrieve project: {}", identifier.value)
}

_ = appActor.ask(CacheServiceFlushDB(KnoraSystemInstances.Users.SystemUser))

} yield maybeProjectADM
}

Expand Down
Expand Up @@ -11,6 +11,7 @@ import zio.metrics.Metric

import java.time.temporal.ChronoUnit

import org.knora.webapi.config.AppConfig
import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectADM
import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectIdentifierADM
import org.knora.webapi.messages.admin.responder.usersmessages.UserADM
Expand All @@ -24,131 +25,134 @@ trait CacheServiceManager {
}

object CacheServiceManager {
val layer: ZLayer[CacheService, Nothing, CacheServiceManager] =

val layer: ZLayer[CacheService & AppConfig, Nothing, CacheServiceManager] =
ZLayer {
for {
cacheService <- ZIO.service[CacheService]
} yield new CacheServiceManager {

val cacheServiceWriteUserTimer = Metric
.timer(
name = "cache-service-write-user",
chronoUnit = ChronoUnit.NANOS
)

val cacheServiceWriteProjectTimer = Metric
.timer(
name = "cache-service-write-project",
chronoUnit = ChronoUnit.NANOS
)

val cacheServiceReadProjectTimer = Metric
.timer(
name = "cache-service-read-project",
chronoUnit = ChronoUnit.NANOS
)

override def receive(msg: CacheServiceRequest) = msg match {
case CacheServicePutUserADM(value) => putUserADM(value)
case CacheServiceGetUserADM(identifier) => getUserADM(identifier)
case CacheServicePutProjectADM(value) => putProjectADM(value)
case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier)
case CacheServicePutString(key, value) => writeStringValue(key, value)
case CacheServiceGetString(key) => getStringValue(key)
case CacheServiceRemoveValues(keys) => removeValues(keys)
case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser)
case CacheServiceGetStatus => ping()
case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other")
}

/**
* Stores the user under the IRI and additionally the IRI under the keys of
* USERNAME and EMAIL:
*
* IRI -> byte array
* username -> IRI
* email -> IRI
*
* @param value the stored value
*/
def putUserADM(value: UserADM): Task[Unit] =
for {
res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration
// _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Retrieves the user stored under the identifier (either iri, username,
* or email).
*
* @param id the project identifier.
*/
def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] =
cacheService.getUserADM(id)

/**
* Stores the project under the IRI and additionally the IRI under the keys
* of SHORTCODE and SHORTNAME:
*
* IRI -> byte array
* shortname -> IRI
* shortcode -> IRI
*
* @param value the stored value
*/
def putProjectADM(value: ProjectADM): Task[Unit] =
for {
res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration
// _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Retrieves the project stored under the identifier (either iri, shortname, or shortcode).
*
* @param identifier the project identifier.
*/
def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] =
for {
res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration
// _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Get value stored under the key as a string.
*
* @param k the key.
*/
def getStringValue(k: String): Task[Option[String]] =
cacheService.getStringValue(k)

/**
* Store string or byte array value under key.
*
* @param k the key.
* @param v the value.
*/
def writeStringValue(k: String, v: String): Task[Unit] =
cacheService.putStringValue(k, v)

/**
* Removes values for the provided keys. Any invalid keys are ignored.
*
* @param keys the keys.
*/
def removeValues(keys: Set[String]): Task[Unit] =
cacheService.removeValues(keys)

/**
* Flushes (removes) all stored content from the store.
*/
def flushDB(requestingUser: UserADM): Task[Unit] =
cacheService.flushDB(requestingUser)

/**
* Pings the cache service to see if it is available.
*/
def ping(): UIO[CacheServiceStatusResponse] =
cacheService.getStatus
}
} yield CacheServiceManagerImpl(cacheService)
}

private final case class CacheServiceManagerImpl(cacheService: CacheService) extends CacheServiceManager {

val cacheServiceWriteUserTimer = Metric
.timer(
name = "cache-service-write-user",
chronoUnit = ChronoUnit.NANOS
)

val cacheServiceWriteProjectTimer = Metric
.timer(
name = "cache-service-write-project",
chronoUnit = ChronoUnit.NANOS
)

val cacheServiceReadProjectTimer = Metric
.timer(
name = "cache-service-read-project",
chronoUnit = ChronoUnit.NANOS
)

override def receive(msg: CacheServiceRequest) = msg match {
case CacheServicePutUserADM(value) => putUserADM(value)
case CacheServiceGetUserADM(identifier) => getUserADM(identifier)
case CacheServicePutProjectADM(value) => putProjectADM(value)
case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier)
case CacheServicePutString(key, value) => writeStringValue(key, value)
case CacheServiceGetString(key) => getStringValue(key)
case CacheServiceRemoveValues(keys) => removeValues(keys)
case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser)
case CacheServiceGetStatus => ping()
case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other")
}

/**
* Stores the user under the IRI and additionally the IRI under the keys of
* USERNAME and EMAIL:
*
* IRI -> byte array
* username -> IRI
* email -> IRI
*
* @param value the stored value
*/
def putUserADM(value: UserADM): Task[Unit] =
for {
res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration
// _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Retrieves the user stored under the identifier (either iri, username,
* or email).
*
* @param id the project identifier.
*/
def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] =
cacheService.getUserADM(id)

/**
* Stores the project under the IRI and additionally the IRI under the keys
* of SHORTCODE and SHORTNAME:
*
* IRI -> byte array
* shortname -> IRI
* shortcode -> IRI
*
* @param value the stored value
*/
def putProjectADM(value: ProjectADM): Task[Unit] =
for {
res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration
// _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Retrieves the project stored under the identifier (either iri, shortname, or shortcode).
*
* @param identifier the project identifier.
*/
def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] =
for {
res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration
// _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value))
} yield res

/**
* Get value stored under the key as a string.
*
* @param k the key.
*/
def getStringValue(k: String): Task[Option[String]] =
cacheService.getStringValue(k)

/**
* Store string or byte array value under key.
*
* @param k the key.
* @param v the value.
*/
def writeStringValue(k: String, v: String): Task[Unit] =
cacheService.putStringValue(k, v)

/**
* Removes values for the provided keys. Any invalid keys are ignored.
*
* @param keys the keys.
*/
def removeValues(keys: Set[String]): Task[Unit] =
cacheService.removeValues(keys)

/**
* Flushes (removes) all stored content from the store.
*/
def flushDB(requestingUser: UserADM): Task[Unit] =
cacheService.flushDB(requestingUser)

/**
* Pings the cache service to see if it is available.
*/
def ping(): UIO[CacheServiceStatusResponse] =
cacheService.getStatus
}
}
Expand Up @@ -1034,7 +1034,7 @@ object TriplestoreServiceHttpConnectorImpl {
// Configure total max or per route limits for persistent connections
// that can be kept in the pool or leased by the connection manager.
connManager.setMaxTotal(100)
connManager.setDefaultMaxPerRoute(10)
connManager.setDefaultMaxPerRoute(15)

// Sipi custom default request config
val defaultRequestConfig = RequestConfig
Expand Down
Expand Up @@ -60,20 +60,6 @@ class ProjectsResponderADMSpec extends CoreSpec with ImplicitSender {
)
expectMsg(ProjectGetResponseADM(SharedTestDataADM.incunabulaProject))

/* Images project */
appActor ! ProjectGetRequestADM(
identifier = ProjectIdentifierADM(maybeIri = Some(SharedTestDataADM.imagesProject.id)),
requestingUser = SharedTestDataADM.rootUser
)
expectMsg(ProjectGetResponseADM(SharedTestDataADM.imagesProject))

/* 'SystemProject' */
appActor ! ProjectGetRequestADM(
identifier = ProjectIdentifierADM(maybeIri = Some(SharedTestDataADM.systemProject.id)),
requestingUser = SharedTestDataADM.rootUser
)
expectMsg(ProjectGetResponseADM(SharedTestDataADM.systemProject))

}

"return information about a project identified by shortname" in {
Expand Down