Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
dvriend committed Aug 19, 2014
0 parents commit 0e6a0fd
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
@@ -0,0 +1,4 @@
.vagrant/
target/
.idea
*.iml
45 changes: 45 additions & 0 deletions README.md
@@ -0,0 +1,45 @@
# akka-persistence-inmemory
Akka-persistence-inmemory is a plugin for [akka-persistence](http://doc.akka.io/docs/akka/snapshot/scala/persistence.html)
that writes journal entries to an in-memory store. It supports writing journal messages and snapshots so its very useful
for testing your persistent actors.

# Installation
To include the plugin into your sbt project, add the following lines to your build.sbt file:

libraryDependencies += "com.github.dnvriend" %% "akka-persistence-inmemory" % "0.0.1"

For Maven users, add the following to the pom.xml

<dependency>
<groupId>com.github.dnvriend</groupId>
<artifactId>akka-persistence-inmemory_2.10</artifactId>
<version>0.0.1</version>
</dependency>

<dependency>
<groupId>com.github.dnvriend</groupId>
<artifactId>akka-persistence-inmemory_2.11</artifactId>
<version>0.0.1</version>
</dependency>

This version of akka-persistence-inmemory depends on Akka 2.3.4 and is cross-built against Scala 2.10.4 and 2.11.2
and should be binary compatible with Akka 2.3.5

# Configuration
Add the following to the application.conf:

```
akka {
persistence {
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
}
}
```

### What's new?

### 0.0.1
- Initial Release

Have fun!
73 changes: 73 additions & 0 deletions build.sbt
@@ -0,0 +1,73 @@
import SonatypeKeys._

organization := "com.github.dnvriend"

name := "akka-persistence-inmemory"

version := "0.0.1"

scalaVersion := "2.11.1"

crossScalaVersions := Seq("2.10.4", "2.11.2")

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

profileName := "com.github.dnvriend"

libraryDependencies ++= {
val akkaVersion = "2.3.4"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-experimental" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.1.2",
"org.slf4j" % "slf4j-nop" % "1.6.4",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "2.1.4" % "test",
"com.github.krasserm" %% "akka-persistence-testkit" % "0.3.4" % "test"
)
}

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v")

// publish settings

publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}

publishMavenStyle := true

publishArtifact in Test := false

pomIncludeRepository := { _ => false }

pomExtra := (
<url>https://github.com/dnvriend/akka-persistence-inmemory</url>
<licenses>
<license>
<name>BSD-style</name>
<url>http://www.opensource.org/licenses/bsd-license.php</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/dnvriend/akka-persistence-inmemory</url>
<connection>scm:git:git@github.com:dnvriend/akka-persistence-inmemory.git</connection>
</scm>
<developers>
<developer>
<id>you</id>
<name>Dennis Vriend</name>
<url>https://github.com/dnvriend</url>
</developer>
</developers>
)

xerial.sbt.Sonatype.sonatypeSettings
1 change: 1 addition & 0 deletions project/build.properties
@@ -0,0 +1 @@
sbt.version=0.13.5
9 changes: 9 additions & 0 deletions project/plugins.sbt
@@ -0,0 +1,9 @@
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")

addSbtPlugin("org.scala-sbt.plugins" % "sbt-onejar" % "0.8")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1")

resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
12 changes: 12 additions & 0 deletions src/main/resources/reference.conf
@@ -0,0 +1,12 @@
akka {

}

inmemory-journal {
class = "akka.persistence.inmemory.journal.InMemoryJournal"
}

inmemory-snapshot-store {
class = "akka.persistence.inmemory.snapshot.InMemorySnapshotStore"
}

134 changes: 134 additions & 0 deletions src/main/scala/akka/persistence/inmemory/journal/InMemoryJournal.scala
@@ -0,0 +1,134 @@
package akka.persistence.inmemory.journal

import akka.actor.ActorLogging
import akka.persistence.{PersistentConfirmation, PersistentId, PersistentRepr}
import akka.persistence.journal.AsyncWriteJournal

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.immutable.Seq
import scala.concurrent.Future

class InMemoryJournal extends AsyncWriteJournal with ActorLogging {
implicit val ec = context.system.dispatcher
val journal: scala.collection.mutable.Map[String, List[PersistentRepr]] = new ConcurrentHashMap[String, List[PersistentRepr]].asScala

override def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = {
Future[Unit] {
val mess = messages
log.debug("writeMessages for {} persistent messages", mess.size)
mess.foreach { repr =>
import repr._
journal.get(persistenceId) match {
case None => journal.put(processorId, List(repr))
case Some(list) => journal.put(processorId, repr :: list)
}
}
}
}

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = {
Future[Unit] {
val perm = permanent
val pid = persistenceId
val toSeq = toSequenceNr
log.debug("asyncDeleteMessagesTo for processorId: {} to sequenceNr: {}, permanent: {}", pid, toSeq, perm)
perm match {
case true =>
journal.get(pid) match {
case None =>
case Some(list) => journal.put(pid, list.filterNot(_.sequenceNr <= toSeq))
}
case false =>
journal.get(pid) match {
case None =>
case Some(list) => journal.put(pid, list.map { repr =>
if(repr.sequenceNr <= toSeq) repr.update(deleted = true) else repr
})
}
}
}
}

@scala.deprecated("writeConfirmations will be removed, since Channels will be removed.")
override def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = {
Future[Unit] {
val confirms = confirmations
log.debug("writeConfirmations for {} messages", confirms.size)
confirms.foreach { confirmation =>
import confirmation._
journal.get(persistenceId) match {
case None =>
case Some(list) =>
journal.put(persistenceId, list.map { msg =>
if(msg.sequenceNr == sequenceNr) {
val confirmationIds = msg.confirms :+ confirmation.channelId
msg.update(confirms = confirmationIds)
} else msg
})
}
}
}
}

@scala.deprecated("asyncDeleteMessages will be removed.")
override def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = {
Future[Unit] {
val mids = messageIds
val perm = permanent
log.debug("Async delete {} messages, permanent: {}", mids.size, perm)

mids.foreach { persistentId =>
import persistentId._
perm match {
case true =>
journal.get(processorId) match {
case None =>
case Some(list) => journal.put(processorId, list.filterNot(_.sequenceNr == sequenceNr))
}
case false =>
journal.get(processorId) match {
case None =>
case Some(list) => journal.put(processorId, list.map { repr =>
if(repr.sequenceNr == sequenceNr) repr.update(deleted = true) else repr
})
}
}
}
}
}

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
Future[Long] {
val pid = persistenceId
val fromSeq = fromSequenceNr
log.debug("Async read for highest sequence number for processorId: {} (hint, seek from nr: {})", pid, fromSeq)
journal.get(pid) match {
case None => 0
case Some(list) => list.map(_.sequenceNr).max
}
}
}

override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = {
Future[Unit] {
val pid = persistenceId
val fromSeq = fromSequenceNr
val toSeq = toSequenceNr
val limit = max
val replay = replayCallback

log.debug("Async replay for processorId {}, from sequenceNr: {}, to sequenceNr: {} with max records: {}", pid, fromSeq, toSeq, limit)

journal.get(pid) match {
case None =>
case Some(list) =>
val takeMax = if(limit >= java.lang.Integer.MAX_VALUE) java.lang.Integer.MAX_VALUE else limit.toInt
list.filter { repr =>
repr.sequenceNr >= fromSeq && repr.sequenceNr <= toSeq
}.sortBy(_.sequenceNr)
.take(takeMax).foreach(replay)
}
}
}
}
@@ -0,0 +1,63 @@
package akka.persistence.inmemory.snapshot

import akka.actor.ActorLogging
import akka.persistence.serialization.Snapshot
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{SnapshotMetadata, Persistence, SelectedSnapshot, SnapshotSelectionCriteria}
import akka.serialization.SerializationExtension
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.concurrent.Future

class InMemorySnapshotStore extends SnapshotStore with ActorLogging {
implicit val system = context.system
val extension = Persistence(context.system)
val serialization = SerializationExtension(context.system)
implicit val executionContext = context.system.dispatcher

val snapshots: scala.collection.mutable.Map[SnapshotMetadata, Any] = new ConcurrentHashMap[SnapshotMetadata, Any].asScala

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
Future[Option[SelectedSnapshot]] {
val pid = persistenceId
val crit = criteria
log.debug("loading for persistenceId: {}, criteria: {}", pid, crit)
val snapshotEntries = snapshots.keySet.toList.filter { meta =>
meta.persistenceId == persistenceId && meta.sequenceNr <= criteria.maxSequenceNr
}.filterNot(_.timestamp > criteria.maxTimestamp)
.sortBy(_.sequenceNr)
.reverse.headOption

snapshotEntries match {
case None => None
case Some(meta) => snapshots.get(meta) match {
case None => None
case Some(value) => Some(SelectedSnapshot(meta, value))
}
}
}
}

override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
Future[Unit] {
val meta = metadata
val snap = snapshot
log.debug("Saving metadata: {}, snapshot: {}", meta, snap)
snapshots.put(meta, snap)
}
}

override def saved(metadata: SnapshotMetadata): Unit = log.debug("Saved: {}", metadata)

override def delete(metadata: SnapshotMetadata): Unit = {
log.debug("Deleting: {}", metadata)
snapshots.remove(metadata)
}

override def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = {
log.debug("Deleting for persistenceId: {} and criteria: {}", persistenceId, criteria)
snapshots.keySet.toList.filter { meta =>
meta.persistenceId == persistenceId && meta.sequenceNr <= criteria.maxSequenceNr
}.foreach(snapshots.remove)
}
}
9 changes: 9 additions & 0 deletions src/test/resources/application.conf
@@ -0,0 +1,9 @@
akka {
loglevel = debug

persistence {
journal.plugin = "inmemory-journal"
snapshot-store.plugin = "inmemory-snapshot-store"
}
}

34 changes: 34 additions & 0 deletions src/test/resources/logback.xml
@@ -0,0 +1,34 @@
<configuration>

<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<pattern>%date{HH:mm:ss} %-5level [%X{akkaSource}] - %msg%n</pattern>
</encoder>
</appender>

<appender name="file" class="ch.qos.logback.core.FileAppender">
<file>${communication-manager.log-file:-communication-manager.log}</file>
<encoder>
<pattern>%date{HH:mm:ss} %-5level [%X{akkaSource}] - %msg%n</pattern>
</encoder>
</appender>

<logger name="com.example" level="debug" additivity="false">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</logger>

<logger name="akka.actor" level="debug" additivity="false">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</logger>

<root level="debug">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</root>

</configuration>

0 comments on commit 0e6a0fd

Please sign in to comment.