Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix(OntologyResponderV2): Add a global ontology cache lock (#1637)
  • Loading branch information
Benjamin Geer committed Apr 27, 2020
1 parent 4f88f4f commit 1853865
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 36 deletions.
Expand Up @@ -60,7 +60,7 @@ object IriLocker {
/**
* The number of times to try to acquire a lock before giving up.
*/
private val MAX_LOCK_RETRIES = 40
private val MAX_LOCK_RETRIES = 150

/**
* The total number of milliseconds that an API request should wait before giving up on acquiring a lock.
Expand Down
Expand Up @@ -61,16 +61,17 @@ import scala.concurrent.Future
*/
class OntologyResponderV2(responderData: ResponderData) extends Responder(responderData) {

/**
* The name of the ontology cache.
*/
// The name of the ontology cache.
private val OntologyCacheName = "ontologyCache"

/**
* The cache key under which cached ontology data is stored.
*/
// The cache key under which cached ontology data is stored.
private val OntologyCacheKey = "ontologyCacheData"

// The global ontology cache lock. This is needed because every ontology update replaces the whole ontology cache
// (because definitions in one ontology can refer to definitions in another ontology). Without a global lock,
// concurrent updates (even to different ontologies) could overwrite each other.
private val ONTOLOGY_CACHE_LOCK_IRI = "http://rdfh.ch/ontologies"

/**
* The in-memory cache of ontologies.
*
Expand All @@ -97,7 +98,7 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
case StandoffEntityInfoGetRequestV2(standoffClassIris, standoffPropertyIris, requestingUser) => getStandoffEntityInfoResponseV2(standoffClassIris, standoffPropertyIris, requestingUser)
case StandoffClassesWithDataTypeGetRequestV2(requestingUser) => getStandoffStandoffClassesWithDataTypeV2(requestingUser)
case StandoffAllPropertyEntitiesGetRequestV2(requestingUser) => getAllStandoffPropertyEntitiesV2(requestingUser)
case CheckSubClassRequestV2(subClassIri, superClassIri, requestingUser) => checkSubClassV2(subClassIri, superClassIri, requestingUser)
case CheckSubClassRequestV2(subClassIri, superClassIri, requestingUser) => checkSubClassV2(subClassIri, superClassIri)
case SubClassesGetRequestV2(resourceClassIri, requestingUser) => getSubClassesV2(resourceClassIri, requestingUser)
case OntologyKnoraEntityIrisGetRequestV2(namedGraphIri, requestingUser) => getKnoraEntityIrisInNamedGraphV2(namedGraphIri, requestingUser)
case OntologyEntitiesGetRequestV2(ontologyIri, allLanguages, requestingUser) => getOntologyEntitiesV2(ontologyIri, allLanguages, requestingUser)
Expand Down Expand Up @@ -1365,7 +1366,7 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
* @param superClassIri the IRI of the resource or value class to check for (whether it is a a super class of `subClassIri` or not).
* @return a [[CheckSubClassResponseV2]].
*/
private def checkSubClassV2(subClassIri: SmartIri, superClassIri: SmartIri, requestingUser: UserADM): Future[CheckSubClassResponseV2] = {
private def checkSubClassV2(subClassIri: SmartIri, superClassIri: SmartIri): Future[CheckSubClassResponseV2] = {
for {
cacheData <- getCacheData
response = CheckSubClassResponseV2(
Expand Down Expand Up @@ -1771,10 +1772,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
// Make the internal ontology IRI.
internalOntologyIri = stringFormatter.makeProjectSpecificInternalOntologyIri(validOntologyName, createOntologyRequest.isShared, projectInfo.project.shortcode)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = createOntologyRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(internalOntologyIri)
)
} yield taskResult
Expand Down Expand Up @@ -1848,10 +1849,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
_ <- checkExternalOntologyIriForUpdate(changeOntologyMetadataRequest.ontologyIri)
internalOntologyIri = changeOntologyMetadataRequest.ontologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = changeOntologyMetadataRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(internalOntologyIri = internalOntologyIri)
)
} yield taskResult
Expand Down Expand Up @@ -2027,10 +2028,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalClassIri = externalClassIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = createClassRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalClassIri = internalClassIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -2387,10 +2388,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalClassIri = externalClassIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = addCardinalitiesRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalClassIri = internalClassIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -2552,10 +2553,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalClassIri = externalClassIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = changeCardinalitiesRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalClassIri = internalClassIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -2653,10 +2654,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalClassIri = externalClassIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = deleteClassRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalClassIri = internalClassIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -2771,10 +2772,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalPropertyIri = externalPropertyIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = deletePropertyRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalPropertyIri = internalPropertyIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -2872,10 +2873,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
_ <- checkExternalOntologyIriForUpdate(deleteOntologyRequest.ontologyIri)
internalOntologyIri = deleteOntologyRequest.ontologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = deleteOntologyRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalOntologyIri = internalOntologyIri
)
Expand Down Expand Up @@ -3147,10 +3148,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalPropertyIri = externalPropertyIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = createPropertyRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalPropertyIri = internalPropertyIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -3308,10 +3309,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalPropertyIri = externalPropertyIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = changePropertyLabelsOrCommentsRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalPropertyIri = internalPropertyIri,
internalOntologyIri = internalOntologyIri
Expand Down Expand Up @@ -3430,10 +3431,10 @@ class OntologyResponderV2(responderData: ResponderData) extends Responder(respon
internalClassIri = externalClassIri.toOntologySchema(InternalSchema)
internalOntologyIri = externalOntologyIri.toOntologySchema(InternalSchema)

// Do the remaining pre-update checks and the update while holding an update lock on the ontology.
// Do the remaining pre-update checks and the update while holding a global ontology cache lock.
taskResult <- IriLocker.runWithIriLock(
apiRequestID = changeClassLabelsOrCommentsRequest.apiRequestID,
iri = internalOntologyIri.toString,
iri = ONTOLOGY_CACHE_LOCK_IRI,
task = () => makeTaskFuture(
internalClassIri = internalClassIri,
internalOntologyIri = internalOntologyIri
Expand Down
6 changes: 3 additions & 3 deletions webapi/src/main/scala/org/knora/webapi/util/CacheUtil.scala
Expand Up @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory
*/
object CacheUtil {

val log = Logger(LoggerFactory.getLogger("org.knora.webapi.util.cache"))
val log: Logger = Logger(LoggerFactory.getLogger("org.knora.webapi.util.cache"))

/**
* Represents the configuration of a Knora application cache.
Expand All @@ -57,7 +57,7 @@ object CacheUtil {
* `overflowToDisk`, `eternal`, `timeToLiveSeconds`, and `timeToIdleSeconds`,
* representing configuration options for Ehcache caches.
*/
def createCaches(cacheConfigs: Seq[KnoraCacheConfig]) = {
def createCaches(cacheConfigs: Seq[KnoraCacheConfig]): Unit = {
val cacheManager = CacheManager.getInstance()
cacheConfigs.foreach {
cacheConfig =>
Expand All @@ -76,7 +76,7 @@ object CacheUtil {
/**
* Removes all caches.
*/
def removeAllCaches() = {
def removeAllCaches(): Unit = {
val cacheManager = CacheManager.getInstance()
cacheManager.removeAllCaches()
// println("CacheUtil: Removed all application caches")
Expand Down
Expand Up @@ -21,7 +21,7 @@ class IriLockerSpec extends WordSpec with Matchers {
"IriLocker" should {
"not allow a request to acquire a lock when another request already has it" in {
def runLongTask(): Future[String] = Future {
Thread.sleep(4500)
Thread.sleep(16000)
SUCCESS
}

Expand Down Expand Up @@ -49,7 +49,7 @@ class IriLockerSpec extends WordSpec with Matchers {
)

val secondTaskFailedWithLockTimeout = try {
Await.result(secondTaskResultFuture, 5.seconds)
Await.result(secondTaskResultFuture, 20.seconds)
false
} catch {
case ale: ApplicationLockException => true
Expand Down

0 comments on commit 1853865

Please sign in to comment.