Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High rate of ClosedChannelException during compaction #358

Open
hicolour opened this issue Mar 29, 2022 · 1 comment
Open

High rate of ClosedChannelException during compaction #358

hicolour opened this issue Mar 29, 2022 · 1 comment
Labels
bug Something isn't working

Comments

@hicolour
Copy link

I'm getting a high rate of ClosedChannelException with the persistent map configuration described below.

  • does it have any impact on data consistency?
  • is there any way to optimize compaction to reduce such errors?
java.nio.channels.ClosedChannelException: null
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
    at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790)
    at swaydb.core.io.file.ChannelFile.read(ChannelFile.scala:94)
    at swaydb.core.io.file.DBFile.read(DBFile.scala:412)
    at swaydb.core.io.reader.FileReader.read(FileReader.scala:73)
    at swaydb.core.segment.format.a.block.reader.BlockReader$.readFullBlock(BlockReader.scala:97)
    at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock(BlockReaderBase.scala:65)
    at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock$(BlockReaderBase.scala:64)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader.readFullBlock(UnblockedReader.scala:81)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader.readAllAndGetReader(UnblockedReader.scala:123)
    at swaydb.core.segment.format.a.block.Block$.unblock(Block.scala:299)
    at swaydb.core.segment.format.a.block.reader.UnblockedReader$.apply(UnblockedReader.scala:69)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$6(SegmentBlockCache.scala:202)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$5(SegmentBlockCache.scala:197)
    at swaydb.data.cache.SynchronisedIO.$anonfun$value$7(Cache.scala:323)
    at swaydb.data.cache.LazyIO.$anonfun$getOrSet$3(Lazy.scala:165)
    at swaydb.data.cache.LazyValue.$anonfun$getOrSet$2(Lazy.scala:96)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.LazyValue.$anonfun$getOrSet$1(Lazy.scala:95)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.LazyValue.getOrSet(Lazy.scala:93)
    at swaydb.data.cache.LazyIO.getOrSet(Lazy.scala:165)
    at swaydb.data.cache.SynchronisedIO.value(Cache.scala:323)
    at swaydb.data.cache.DeferredIO.value(Cache.scala:295)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$createReader$1(SegmentBlockCache.scala:330)
    at scala.Option.getOrElse(Option.scala:189)
    at swaydb.data.cache.DeferredIO.getOrElse(Cache.scala:302)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createReader(SegmentBlockCache.scala:328)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createSortedIndexReader(SegmentBlockCache.scala:470)
    at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.iterator(SegmentBlockCache.scala:531)
    at swaydb.core.segment.SegmentRef.iterator(SegmentRef.scala:889)
    at swaydb.core.segment.SegmentRef$.put(SegmentRef.scala:635)
    at swaydb.core.segment.PersistentSegmentOne.put(PersistentSegmentOne.scala:288)
    at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$2(Level.scala:1160)
    at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$1(Level.scala:1158)
    at swaydb.IO$IterableIOImplicit.mapRecoverIO(IO.scala:227)
    at swaydb.core.level.Level.putAssignedKeyValues(Level.scala:1155)
    at swaydb.core.level.Level.$anonfun$putKeyValues$2(Level.scala:1106)
    at swaydb.IO$Right.$anonfun$flatMap$1(IO.scala:570)
    at swaydb.IO$Right.flatMap(IO.scala:404)
    at swaydb.core.level.Level.putKeyValues(Level.scala:1098)
    at swaydb.core.level.Level.$anonfun$put$11(Level.scala:704)
    at swaydb.core.level.Level.$anonfun$reserveAndRelease$2(Level.scala:578)
    at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
    at swaydb.IO$Right.map(IO.scala:396)
    at swaydb.core.level.Level.$anonfun$reserveAndRelease$1(Level.scala:576)
    at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
    at swaydb.IO$Right.map(IO.scala:396)
    at swaydb.core.level.Level.reserveAndRelease(Level.scala:574)
    at swaydb.core.level.Level.put(Level.scala:694)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:191)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:176)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:155)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJob(ThrottleCompaction.scala:131)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJobs(ThrottleCompaction.scala:107)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runNow(ThrottleCompaction.scala:73)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:59)
    at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:48)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.doWakeUp(ThrottleCompactor.scala:287)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:309)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:51)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1(ThrottleCompactor.scala:232)
    at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1$adapted(ThrottleCompactor.scala:228)
    at swaydb.ActorWire.$anonfun$send$2$adapted(ActorWire.scala:148)
    at swaydb.ActorWire.$anonfun$actor$2$adapted(ActorWire.scala:61)
    at swaydb.Actor.swaydb$Actor$$receive(Actor.scala:773)
    at swaydb.Actor.$anonfun$swaydb$Actor$$basicWakeUp$1(Actor.scala:569)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:834)

  /*
    Specifies if key-value IDs should be cached. There are over 1300 key-value Ids, if caching is disabled then on disk binary-search is performed to find the IDs.
    https://swaydb.io/configuration/cacheKeyValueIds/?language=scala
   */
  val cacheKeyValueIdsOverride = false

  /*
    This enables caching raw bytes stored in Persistent Segment and disables caching of actual key-values
    https://swaydb.io/configuration/memoryCache/?language=scala
   */
  val memoryCacheOverride = MemoryCache.off

  /*
    Setting mmap to true in LevelZero will write key-values to memory-mapped write-ahead log files.
    If false, java.nio.FileChannel are used.
    https://swaydb.io/configuration/mmap/map/?language=scala
   */
  val mmapDisabled = MMAP.Off(
    ForceSave.Off
  )

  val threadStateCacheOverride = ThreadStateCache.Limit(hashMapMaxSize = 10, maxProbe = 1)
  //ThreadStateCache.off

  /*
    Configures all Persistent Segment file for the Level
    https://swaydb.io/configuration/segmentConfig/?language=scala
   */
  val segmentConfigOverride = DefaultConfigs
    .segmentConfig()
    .copyWithMmap(
      mmapDisabled
    )
    .copyWithCacheSegmentBlocksOnCreate(false)
    .copyWithFileOpenIOStrategy(IOStrategy.SynchronisedIO(cacheOnAccess = true))
    .copyWithBlockIOStrategy(
      blockIOStrategy = (_) => IOStrategy.SynchronisedIO(cacheOnAccess = false)
    )

  // Following configuration disable caching on the file read/write layer
  val fileCacheOverride = FileCache.On(
    0,
    ActorConfig.TimeLoop(
      name = s"${this.getClass.getName} - FileCache TimeLoop Actor",
      delay = 1.seconds,
      ec = DefaultExecutionContext.sweeperEC
    )
  )

  /*
    It stores all keys in sorted order.
    https://swaydb.io/configuration/sortedKeyIndex/?language=scala&q=sortedKeyIndex
   */
  val sortedKeyIndexOverride = DefaultConfigs.sortedKeyIndex(false)
  /*
    hashIndexes can double random read performance and reduce IOps which would increases overall DB performance.
    https://swaydb.io/configuration/randomKeyIndex/?language=scala
   */
  val randomSearchIndexOverride = DefaultConfigs.randomSearchIndex(false)
  /*
    It allows for faster read performance over sortedKeyIndex or linear-search for random reads and is essential for fast forward & reverse iterations.
    https://swaydb.io/configuration/binarySearchIndex/?language=scala
   */
  val binarySearchIndexOverride = DefaultConfigs.binarySearchIndex(false)
  /*
    BloomFilter is a small byte array that can be created in each persistent segment and is used to determine if a key exists in the segment without actually searching the Segment.
    https://swaydb.io/configuration/mightContainKeyIndex/?language=scal
   */
  val mightContainIndexOverride = DefaultConfigs.mightContainIndex(false)

  /*
    Configures storage for values within a persistent segment.
    https://swaydb.io/configuration/valuesConfig/?language=scala&q=mightContainIndex
   */
  val valuesConfigOverride = DefaultConfigs.valuesConfig(false)

  def mapCreate(name: String) =
    persistent.Map[String, Int, Nothing, Glass](
      dir = File.newTemporaryDirectory(name).deleteOnExit().path,
      mmapMaps = mmapDisabled,
      cacheKeyValueIds = cacheKeyValueIdsOverride,
      segmentConfig = segmentConfigOverride,
      sortedKeyIndex = sortedKeyIndexOverride,
      randomSearchIndex = randomSearchIndexOverride,
      binarySearchIndex = binarySearchIndexOverride,
      mightContainIndex = mightContainIndexOverride,
      valuesConfig = valuesConfigOverride,
      fileCache = fileCacheOverride,
      memoryCache = memoryCacheOverride,
    )
@simerplaha
Copy link
Owner

simerplaha commented Mar 29, 2022

This will not cause any data consistency issue.

The problem here is known - Caching issue. Compaction is trying to merge a Segment file using a FileChannel that is closed. It should attempt at reopening the file. This could cause compaction to not progress if it keeps failing.

I've been wanting to fix this just haven't had the time.

Thank you for reporting these issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants