diff --git a/webapi/src/main/resources/application.conf b/webapi/src/main/resources/application.conf index 0afd07e262..fc9545bd0e 100644 --- a/webapi/src/main/resources/application.conf +++ b/webapi/src/main/resources/application.conf @@ -18,7 +18,7 @@ akka { # SYN-packets and connection attempts may fail. Note, that the backlog # size is usually only a maximum size hint for the OS and the OS can # restrict the number further based on global limits. - backlog = 100 + backlog = 1024 # The time after which an idle connection will be automatically closed. # Set to `infinite` to completely disable idle connection timeouts. diff --git a/webapi/src/main/scala/org/knora/webapi/core/AppRouter.scala b/webapi/src/main/scala/org/knora/webapi/core/AppRouter.scala index 41a9a52885..94fc9cbee6 100644 --- a/webapi/src/main/scala/org/knora/webapi/core/AppRouter.scala +++ b/webapi/src/main/scala/org/knora/webapi/core/AppRouter.scala @@ -63,7 +63,7 @@ object AppRouter { val populateOntologyCaches: UIO[Unit] = { val request = LoadOntologiesRequestV2(requestingUser = KnoraSystemInstances.Users.SystemUser) - val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(3, scala.concurrent.duration.SECONDS)) + val timeout = Timeout(new scala.concurrent.duration.FiniteDuration(60, scala.concurrent.duration.SECONDS)) for { response <- ZIO.fromFuture(_ => (ref.ask(request)(timeout)).mapTo[SuccessResponseV2]).orDie diff --git a/webapi/src/main/scala/org/knora/webapi/messages/util/search/QueryTraverser.scala b/webapi/src/main/scala/org/knora/webapi/messages/util/search/QueryTraverser.scala index 731619d18b..85ce575294 100644 --- a/webapi/src/main/scala/org/knora/webapi/messages/util/search/QueryTraverser.scala +++ b/webapi/src/main/scala/org/knora/webapi/messages/util/search/QueryTraverser.scala @@ -6,8 +6,8 @@ package org.knora.webapi.messages.util.search import akka.actor.ActorRef +import akka.http.scaladsl.util.FastFuture import akka.pattern.ask -import akka.util.Timeout import scala.concurrent.ExecutionContext import scala.concurrent._ @@ -204,7 +204,6 @@ trait ConstructToSelectTransformer extends WhereTransformer { */ object QueryTraverser { private implicit val stringFormatter: StringFormatter = StringFormatter.getGeneralInstance - private implicit val timeout: Timeout = Duration(5, SECONDS) /** * Helper method that analyzed an RDF Entity and returns a sequence of Ontology IRIs that are being referenced by the entity. @@ -225,20 +224,21 @@ object QueryTraverser { val maybeOntoIri = map.get(internal) maybeOntoIri match { // if the map contains an ontology IRI corresponding to the entity IRI, then this can be returned - case Some(iri) => Future(Seq(iri)) + case Some(iri) => FastFuture.successful(Seq(iri)) case None => { // if the map doesn't contain a corresponding ontology IRI, then the entity IRI points to a resource or value // in that case, all ontologies of the project, to which the entity belongs, should be returned. val shortcode = internal.getProjectCode shortcode match { - case None => Future(Seq.empty) + case None => FastFuture.successful(Seq.empty) case Some(_) => { // find the project with the shortcode for { - projectMaybe <- appActor - .ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode))) - .mapTo[Option[ProjectADM]] + projectMaybe <- + appActor + .ask(ProjectGetADM(ProjectIdentifierADM(maybeShortcode = shortcode)))(Duration(100, SECONDS)) + .mapTo[Option[ProjectADM]] projectOntologies = projectMaybe match { case None => Seq.empty // return all ontologies of the project @@ -250,7 +250,7 @@ object QueryTraverser { } } } - case _ => Future(Seq.empty) + case _ => FastFuture.successful(Seq.empty) } /** diff --git a/webapi/src/main/scala/org/knora/webapi/responders/admin/ProjectsResponderADM.scala b/webapi/src/main/scala/org/knora/webapi/responders/admin/ProjectsResponderADM.scala index ab04303a98..69e1906ade 100644 --- a/webapi/src/main/scala/org/knora/webapi/responders/admin/ProjectsResponderADM.scala +++ b/webapi/src/main/scala/org/knora/webapi/responders/admin/ProjectsResponderADM.scala @@ -239,8 +239,6 @@ class ProjectsResponderADM(responderData: ResponderData) extends Responder(respo log.debug("getSingleProjectADM - could not retrieve project: {}", identifier.value) } - _ = appActor.ask(CacheServiceFlushDB(KnoraSystemInstances.Users.SystemUser)) - } yield maybeProjectADM } diff --git a/webapi/src/main/scala/org/knora/webapi/store/cache/CacheServiceManager.scala b/webapi/src/main/scala/org/knora/webapi/store/cache/CacheServiceManager.scala index 8ba1f1a251..6156fbde2b 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/cache/CacheServiceManager.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/cache/CacheServiceManager.scala @@ -11,6 +11,7 @@ import zio.metrics.Metric import java.time.temporal.ChronoUnit +import org.knora.webapi.config.AppConfig import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectADM import org.knora.webapi.messages.admin.responder.projectsmessages.ProjectIdentifierADM import org.knora.webapi.messages.admin.responder.usersmessages.UserADM @@ -24,131 +25,134 @@ trait CacheServiceManager { } object CacheServiceManager { - val layer: ZLayer[CacheService, Nothing, CacheServiceManager] = + + val layer: ZLayer[CacheService & AppConfig, Nothing, CacheServiceManager] = ZLayer { for { cacheService <- ZIO.service[CacheService] - } yield new CacheServiceManager { - - val cacheServiceWriteUserTimer = Metric - .timer( - name = "cache-service-write-user", - chronoUnit = ChronoUnit.NANOS - ) - - val cacheServiceWriteProjectTimer = Metric - .timer( - name = "cache-service-write-project", - chronoUnit = ChronoUnit.NANOS - ) - - val cacheServiceReadProjectTimer = Metric - .timer( - name = "cache-service-read-project", - chronoUnit = ChronoUnit.NANOS - ) - - override def receive(msg: CacheServiceRequest) = msg match { - case CacheServicePutUserADM(value) => putUserADM(value) - case CacheServiceGetUserADM(identifier) => getUserADM(identifier) - case CacheServicePutProjectADM(value) => putProjectADM(value) - case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier) - case CacheServicePutString(key, value) => writeStringValue(key, value) - case CacheServiceGetString(key) => getStringValue(key) - case CacheServiceRemoveValues(keys) => removeValues(keys) - case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser) - case CacheServiceGetStatus => ping() - case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other") - } - - /** - * Stores the user under the IRI and additionally the IRI under the keys of - * USERNAME and EMAIL: - * - * IRI -> byte array - * username -> IRI - * email -> IRI - * - * @param value the stored value - */ - def putUserADM(value: UserADM): Task[Unit] = - for { - res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration - // _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value)) - } yield res - - /** - * Retrieves the user stored under the identifier (either iri, username, - * or email). - * - * @param id the project identifier. - */ - def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] = - cacheService.getUserADM(id) - - /** - * Stores the project under the IRI and additionally the IRI under the keys - * of SHORTCODE and SHORTNAME: - * - * IRI -> byte array - * shortname -> IRI - * shortcode -> IRI - * - * @param value the stored value - */ - def putProjectADM(value: ProjectADM): Task[Unit] = - for { - res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration - // _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value)) - } yield res - - /** - * Retrieves the project stored under the identifier (either iri, shortname, or shortcode). - * - * @param identifier the project identifier. - */ - def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] = - for { - res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration - // _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value)) - } yield res - - /** - * Get value stored under the key as a string. - * - * @param k the key. - */ - def getStringValue(k: String): Task[Option[String]] = - cacheService.getStringValue(k) - - /** - * Store string or byte array value under key. - * - * @param k the key. - * @param v the value. - */ - def writeStringValue(k: String, v: String): Task[Unit] = - cacheService.putStringValue(k, v) - - /** - * Removes values for the provided keys. Any invalid keys are ignored. - * - * @param keys the keys. - */ - def removeValues(keys: Set[String]): Task[Unit] = - cacheService.removeValues(keys) - - /** - * Flushes (removes) all stored content from the store. - */ - def flushDB(requestingUser: UserADM): Task[Unit] = - cacheService.flushDB(requestingUser) - - /** - * Pings the cache service to see if it is available. - */ - def ping(): UIO[CacheServiceStatusResponse] = - cacheService.getStatus - } + } yield CacheServiceManagerImpl(cacheService) + } + + private final case class CacheServiceManagerImpl(cacheService: CacheService) extends CacheServiceManager { + + val cacheServiceWriteUserTimer = Metric + .timer( + name = "cache-service-write-user", + chronoUnit = ChronoUnit.NANOS + ) + + val cacheServiceWriteProjectTimer = Metric + .timer( + name = "cache-service-write-project", + chronoUnit = ChronoUnit.NANOS + ) + + val cacheServiceReadProjectTimer = Metric + .timer( + name = "cache-service-read-project", + chronoUnit = ChronoUnit.NANOS + ) + + override def receive(msg: CacheServiceRequest) = msg match { + case CacheServicePutUserADM(value) => putUserADM(value) + case CacheServiceGetUserADM(identifier) => getUserADM(identifier) + case CacheServicePutProjectADM(value) => putProjectADM(value) + case CacheServiceGetProjectADM(identifier) => getProjectADM(identifier) + case CacheServicePutString(key, value) => writeStringValue(key, value) + case CacheServiceGetString(key) => getStringValue(key) + case CacheServiceRemoveValues(keys) => removeValues(keys) + case CacheServiceFlushDB(requestingUser) => flushDB(requestingUser) + case CacheServiceGetStatus => ping() + case other => ZIO.logError(s"CacheServiceManager received an unexpected message: $other") } + + /** + * Stores the user under the IRI and additionally the IRI under the keys of + * USERNAME and EMAIL: + * + * IRI -> byte array + * username -> IRI + * email -> IRI + * + * @param value the stored value + */ + def putUserADM(value: UserADM): Task[Unit] = + for { + res <- cacheService.putUserADM(value) @@ cacheServiceWriteUserTimer.trackDuration + // _ <- cacheServiceWriteUserTimer.value.tap(value => ZIO.debug(value)) + } yield res + + /** + * Retrieves the user stored under the identifier (either iri, username, + * or email). + * + * @param id the project identifier. + */ + def getUserADM(id: UserIdentifierADM): Task[Option[UserADM]] = + cacheService.getUserADM(id) + + /** + * Stores the project under the IRI and additionally the IRI under the keys + * of SHORTCODE and SHORTNAME: + * + * IRI -> byte array + * shortname -> IRI + * shortcode -> IRI + * + * @param value the stored value + */ + def putProjectADM(value: ProjectADM): Task[Unit] = + for { + res <- cacheService.putProjectADM(value) @@ cacheServiceWriteProjectTimer.trackDuration + // _ <- cacheServiceWriteProjectTimer.value.tap(value => ZIO.debug(value)) + } yield res + + /** + * Retrieves the project stored under the identifier (either iri, shortname, or shortcode). + * + * @param identifier the project identifier. + */ + def getProjectADM(id: ProjectIdentifierADM): Task[Option[ProjectADM]] = + for { + res <- cacheService.getProjectADM(id) @@ cacheServiceReadProjectTimer.trackDuration + // _ <- cacheServiceReadProjectTimer.value.tap(value => ZIO.debug(value)) + } yield res + + /** + * Get value stored under the key as a string. + * + * @param k the key. + */ + def getStringValue(k: String): Task[Option[String]] = + cacheService.getStringValue(k) + + /** + * Store string or byte array value under key. + * + * @param k the key. + * @param v the value. + */ + def writeStringValue(k: String, v: String): Task[Unit] = + cacheService.putStringValue(k, v) + + /** + * Removes values for the provided keys. Any invalid keys are ignored. + * + * @param keys the keys. + */ + def removeValues(keys: Set[String]): Task[Unit] = + cacheService.removeValues(keys) + + /** + * Flushes (removes) all stored content from the store. + */ + def flushDB(requestingUser: UserADM): Task[Unit] = + cacheService.flushDB(requestingUser) + + /** + * Pings the cache service to see if it is available. + */ + def ping(): UIO[CacheServiceStatusResponse] = + cacheService.getStatus + } } diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceHttpConnectorImpl.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceHttpConnectorImpl.scala index 492df75dba..10e0810a2d 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceHttpConnectorImpl.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceHttpConnectorImpl.scala @@ -1034,7 +1034,7 @@ object TriplestoreServiceHttpConnectorImpl { // Configure total max or per route limits for persistent connections // that can be kept in the pool or leased by the connection manager. connManager.setMaxTotal(100) - connManager.setDefaultMaxPerRoute(10) + connManager.setDefaultMaxPerRoute(15) // Sipi custom default request config val defaultRequestConfig = RequestConfig diff --git a/webapi/src/test/scala/org/knora/webapi/responders/admin/ProjectsResponderADMSpec.scala b/webapi/src/test/scala/org/knora/webapi/responders/admin/ProjectsResponderADMSpec.scala index 7ae4d1c46f..a50f92153b 100644 --- a/webapi/src/test/scala/org/knora/webapi/responders/admin/ProjectsResponderADMSpec.scala +++ b/webapi/src/test/scala/org/knora/webapi/responders/admin/ProjectsResponderADMSpec.scala @@ -60,20 +60,6 @@ class ProjectsResponderADMSpec extends CoreSpec with ImplicitSender { ) expectMsg(ProjectGetResponseADM(SharedTestDataADM.incunabulaProject)) - /* Images project */ - appActor ! ProjectGetRequestADM( - identifier = ProjectIdentifierADM(maybeIri = Some(SharedTestDataADM.imagesProject.id)), - requestingUser = SharedTestDataADM.rootUser - ) - expectMsg(ProjectGetResponseADM(SharedTestDataADM.imagesProject)) - - /* 'SystemProject' */ - appActor ! ProjectGetRequestADM( - identifier = ProjectIdentifierADM(maybeIri = Some(SharedTestDataADM.systemProject.id)), - requestingUser = SharedTestDataADM.rootUser - ) - expectMsg(ProjectGetResponseADM(SharedTestDataADM.systemProject)) - } "return information about a project identified by shortname" in {