Skip to content

coherence-community/coherence-rx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

63 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Oracle Coherence Reactive Extensions (Rx)

CoherenceRx project provides reactive API for Oracle Coherence in-memory data grid, based on the popular RxJava library.

It is implemented as a thin wrapper around Oracle Coherence Asynchronous API, which implies that it requires Coherence 12.2.1 or a newer release.

Why CoherenceRx?

Reactive Programming is somewhat of an all-or-nothing proposition, or as Andre Staltz pointed out in his excellent tutorial:

Everything is a Stream

When you are writing reactive application and need to access data source that doesn't provide a reactive API, life can get complicated. In order to simplify our users' lives we decided to implement CoherenceRx and release it as an open source add-on for Coherence.

Using CoherenceRx

The easiest way to include CoherenceRx into your own project is to add it as a Maven dependency (along with Coherence itself and RxJava):

<dependency>
  <groupId>com.oracle.coherence</groupId>
  <artifactId>coherence</artifactId>
  <version>${coherence.version}</version>
</dependency>

<dependency>
  <groupId>com.oracle.coherence</groupId>
  <artifactId>coherence-rx</artifactId>
  <version>${coherence-rx.version}</version>
</dependency>

<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>${rxjava.version}</version>
</dependency>

and configure versions within Maven properties section:

<coherence.version>12.2.1-0-0</coherence.version>
<coherence-rx.version>1.0.0</coherence-rx.version>
<rxjava.version>1.1.0</rxjava.version>

Once you have the necessary dependencies properly configured, you can use the static RxNamedCache.rx method to create an instance of RxNamedCache:

NamedCache<Long, Product>   cache   = CacheFactory.getTypedCache("trades", withTypes(Long.class, Product.class));
RxNamedCache<Long, Product> rxCache = RxNamedCache.rx(cache);

Of course, you can also use static import for the RxNamedCache.rx method, which would make the code even simpler.

The RxNamedCache interface will be familiar to anyone who has used Coherence NamedCache API before, with one major difference: all the methods return an Observable.

For example, RxNamedCache.get will return an Observable<V> which will eventually emit the value of the cache entry for the given key and complete:

rxCache.get(5L).subscribe(product -> System.out.println("Got: " + product));

Another important difference is that the bulk read operations, such as getAll, keySet, entrySet and values do not return a single container value like their NamedCache counterparts, but an Observable stream of individual values:

rxCache.values().subscribe(product -> System.out.println("Got: " + product));

This is both more efficient, as it doesn't realize full result set on the client, and simpler, as it allows you to process each individual value as it is emitted by the underlying Observable.

For example, if you wanted to process batches of 10 products at a time, you could trivially accomplish that using buffer operation:

rxCache.values()
       .buffer(10)
       .subscribe(productList -> System.out.println("Got: " + productList));

Observing Event Streams

Oracle Coherence provides rich event notification functionality, so it only made sense to provide an adapter that allows you to use RxJava to process stream of event notifications.

CoherenceRx introduces ObservableMapListener, which extends RxJava Observable and implements Coherence MapListener interface. The ObservableMapListener simply propagates each received event to all of its subscribers:

ObservableMapListener<Long, Product> listener = ObservableMapListener.create();
listener.subscribe(System.out::println);

cache.addMapListener(listener);

Of course, the above is not very interesting, and could be easily achieved using standard SimpleMapListener as well. But it becomes a lot more interesting when you start applying various RxJava operators to transform, filter and even combine event streams:

ObservableMapListener<Long, Trade> listener = ObservableMapListener.create();
listener.filter(evt -> evt.getId() == MapEvent.ENTRY_INSERTED)
        .map(MapEvent::getNewValue)
        .buffer(10, TimeUnit.SECONDS)
        .subscribe(trades -> System.out.println("Trades placed in the last 10 seconds: " + trades));

cache.addMapListener(listener);

It is important to note that unlike Observables returned by the RxNamedCache methods, which are 'cold', the ObservableMapListener is a 'hot' Observable and will start receiving and processing the events as soon as it is registered with the cache using NamedCache.addMapListener method.

Because of that, it is important that you add Subscribers to it before calling NamedCache.addMapListener, or you could miss some events.

Building CoherenceRx

The following sections describe the steps necessary to build CoherenceRx from the source.

Prerequisites

In order to build or use the Coherence Reactive Extensions you must have the following installed:

  1. Java 8 SE Development Kit or Runtime environment

  2. Maven 3.0.5 or above installed and configured

  3. Coherence 12.2.1.0.0 or above installed

Ensure the following environment variables are set:

JAVA_HOME -- Make sure that the JAVA_HOME environment variable points to the location of a JDK supported by the Oracle Coherence version you are using.

COHERENCE_HOME -- Make sure COHERENCE_HOME is set to point to your Coherence install directory. This is only required for the Maven install-file commands.

MAVEN_HOME -- If mvn command is not in your path then you should set MAVEN_HOME and then add MAVEN_HOME\bin to your PATH in a similar way to Java being added to the path below.

You must also ensure the java command is in the path.

E.g. for Linux/UNIX/Mac:

export PATH=$JAVA_HOME/bin:$PATH

For Windows:

set PATH=%JAVA_HOME%\bin;%PATH%

You must have Coherence installed into your local Maven repository. If you do not, then carry out the following, replacing the version number with the version of Coherence you have installed.

E.g. for Linux/UNIX/Mac:

mvn install:install-file\
    -Dfile=$COHERENCE_HOME/lib/coherence.jar\
    -DpomFile=$COHERENCE_HOME/plugins/maven/com/oracle/coherence/coherence/12.2.1/coherence.12.2.1.pom

E.g. for Windows:

mvn install:install-file\
    -Dfile=%COHERENCE_HOME%\lib\coherence.jar\
    -DpomFile=%COHERENCE_HOME%\plugins\maven\com\oracle\coherence\coherence\12.2.1\coherence.12.2.1.pom

Build Instructions

Build the Coherence Reactive Extensions by running:

mvn clean install

The target directory will contain a number of files:

coherence-rx-x.y.z.jar          - JAR file
coherence-rx-x.y.z-javadoc.jar  - javadoc
coherence-rx-x.y.z-sources.jar  - sources

(where x.y.z is the current version of the Coherence Reactive Extensions)

References

For more information on Oracle Coherence, please see the following links:

License

The Universal Permissive License (UPL), Version 1.0

About

Reactive Extensions (Rx) API for Oracle Coherence

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages