Skip to content
forked from pellse/assembler

Functional, type-safe and stateless Java API for solving the N + 1 query problem in multi-databases and microservices aggregation

Notifications You must be signed in to change notification settings

thaingo/assembler

 
 

Repository files navigation

Assembler

Lightweight library allowing to efficiently assemble entities from querying/merging external datasources or aggregating microservices.

More specifically it was designed as a very lightweight solution to resolve the N + 1 query problem when aggregating data, not only from database calls (e.g. Spring Data JPA, Hibernate) but from arbitrary datasources (relational databases, NoSQL, REST, local method calls, etc.).

One key feature is that the caller doesn't need to worry about the order of the data returned by the different datasources, so no need for example (in a relation database context) to modify any SQL query to add an ORDER BY clause, or (in a REST context) to modify the service implementation or manually sort results from each call before triggering the aggregation process.

Stay tuned for more complete documentation very soon in terms of more detailed explanations regarding how the library works and comparisons with other solutions, a dedicated series of blog posts is also coming on https://javatechnicalwealth.com/blog/

Supported Technologies

Currently the following implementations are supported (with links to their respective Maven repositories):

  1. Maven Central Javadocs Java 8 Stream (synchronous and parallel)
  2. Maven Central Javadocs CompletableFuture
  3. Maven Central Javadocs Flux
  4. Maven Central Javadocs RxJava
  5. Maven Central Javadocs Akka Stream

You only need to include in your project's build file (maven, gradle) the lib that corresponds to the type of reactive (or non reactive) support needed (Java 8 stream, CompletableFuture, Flux, RxJava, Akka Stream).

All modules above have dependencies on the following modules:

  1. Maven Central Javadocs assembler-core
  2. Maven Central Javadocs assembler-util

Use Cases

One interesting use case would be for example to build a materialized view in a microservice architecture supporting Event Sourcing and Command Query Responsibility Segregation (CQRS). In this context, if you have an incoming stream of events where each event needs to be enriched with some sort of external data before being stored (e.g. stream of GPS coordinates enriched with location service and/or weather service), it would be convenient to be able to easily batch those events instead of hitting those external services for every single event.

Another use case could be when working with JPA projections (to fetch minimal amount of data) instead of full blown JPA entities (directly with Hibernate of through Spring Data repositories). In some cases the EntityManager might not be of any help to efficiently join multiple entities together or at least it might not be trivial to cache/optimize queries to avoid the N + 1 query problem.

Usage Examples

Assuming the following data model and api to return those entities:

@Data
@AllArgsConstructor
public class Customer {
    private final Long customerId;
    private final String name;
}

@Data
@AllArgsConstructor
public class BillingInfo {
    private final Long customerId;
    private final String creditCardNumber;
}

@Data
@AllArgsConstructor
public class OrderItem {
    private final Long customerId;
    private final String orderDescription;
    private final Double price;
}

@Data
@AllArgsConstructor
public class Transaction {
    private final Customer customer;
    private final BillingInfo billingInfo;
    private final List<OrderItem> orderItems;
}

List<Customer> getCustomers(); // This could be a REST call to an already existing microservice
List<BillingInfo> getBillingInfoForCustomers(List<Long> customerIds); // This could be a call to MongoDB
List<OrderItem> getAllOrdersForCustomers(List<Long> customerIds); // This could be a call to an Oracle database

So if getCustomers() returns 50 customers, instead of having to make one additional call per customerId to retrieve each customer's associated BillingInfo (which would result in 50 additional network calls, thus the N + 1 queries issue) we can only make 1 additional call to retrieve all at once all BillingInfos for all Customers returned by getCustomers(), same for OrderItems. This implies though that combining Customers, BillingInfos and OrderItems into Transactions using customerId as a correlation id between all those entities has to be done outside of those datasources, which is what this library was implemented for:

To build the Transaction entity we can simply combine the invocation of the methods declared above using (from StreamAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.stream.StreamAdapter.streamAdapter;

List<Transaction> transactions = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId, BillingInfo::new), // Default BillingInfo for null values
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(streamAdapter())
    .assembleFromSupplier(this::getCustomers)
    .collect(toList());

It is also possible to bind to a different execution engine (e.g. for parallel processing) just by switching to a different AssemblerAdapter implementation. For example, to support the aggregation process through CompletableFuture, just plug a CompletableFutureAdapter instead (from CompletableFutureAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.future.CompletableFutureAdapter.completableFutureAdapter;

CompletableFuture<List<Transaction>> transactions = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId, BillingInfo::new),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(completableFutureAdapter())
    .assembleFromSupplier(this::getCustomers);

Reactive support is also provided through the Spring Project Reactor to asynchronously retrieve all data to be aggregated, for example (from FluxAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.flux.FluxAdapter.fluxAdapter;

Flux<Transaction> transactionFlux = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(fluxAdapter(elastic()))
    .assembleFromSupplier(this::getCustomers))

or by reusing the same Assembler instance as a transformation step within a Flux:

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
         oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
         oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
         Transaction::new)
    .using(fluxAdapter()); // Parallel scheduler used by default

Flux<Transaction> transactionFlux = Flux.fromIterable(getCustomers()) // or just getCustomerFlux()
    .bufferTimeout(10, ofSeconds(5)) // every 5 seconds or max of 10 customers, returns Flux<List<Customer>>
    .flatMap(assembler::assemble) // flatMap(customerList -> assembler.assemble(customerList)))

In addition to the Flux implementation, RxJava is also supported through Observables and Flowables.

With Observable support (from ObservableAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.rxjava.ObservableAdapter.observableAdapter;

Observable<Transaction> transactionObservable = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
         oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
         oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
         Transaction::new)
     .using(observableAdapter(single()))
     .assembleFromSupplier(this::getCustomers);

With Flowable support (from FlowableAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.rxjava.FlowableAdapter.flowableAdapter;

Flowable<Transaction> transactionFlowable = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(flowableAdapter(single()))
    .assembleFromSupplier(this::getCustomers);

By using an AkkaSourceAdapter we can support the Akka Stream framework by creating instances of Akka Source (from AkkaSourceAssemblerTest):

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.akkastream.AkkaSourceAdapter.akkaSourceAdapter;

ActorSystem system = ActorSystem.create();
Materializer mat = ActorMaterializer.create(system);

Source<Transaction, NotUsed> transactionSource = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(akkaSourceAdapter())) // Sequential
    .assembleFromSupplier(this::getCustomers);

transactionSource.runWith(Sink.foreach(System.out::println), mat)
    .toCompletableFuture().get();

or

ActorSystem system = ActorSystem.create();
Materializer mat = ActorMaterializer.create(system);

Assembler<Customer, Source<Transaction, NotUsed>> assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(akkaSourceAdapter(true)); // Parallel

Source<Transaction, NotUsed> transactionSource = Source.from(getCustomers())
    .groupedWithin(3, ofSeconds(5))
    .flatMapConcat(assembler::assemble)

transactionSource.runWith(Sink.foreach(System.out::println), mat)
    .toCompletableFuture().get();

It is also possible to create an Akka Flow from the Assembler DSL:

ActorSystem system = ActorSystem.create();
Materializer mat = ActorMaterializer.create(system);

Assembler<Customer, Source<Transaction, NotUsed>> assembler = assemblerOf(Transaction.class)
    .withIdExtractor(Customer::getCustomerId)
    .withAssemblerRules(
        oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId),
        oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId),
        Transaction::new)
    .using(akkaSourceAdapter(Source::async)); // Custom underlying sources configuration

Source<Customer, NotUsed> customerSource = Source.from(getCustomers());

Flow<Customer, Transaction, NotUsed> transactionFlow = Flow.<Customer>create()
    .grouped(3)
    .flatMapConcat(assembler::assemble);
        
customerSource.via(transactionFlow)
    .runWith(Sink.foreach(System.out::println), mat)
    .toCompletableFuture().get();

Caching

In addition to providing helper functions to define mapping semantics (e.g. oneToOne(), manyToOne()), MapperUtils also provides a simple caching/memoization mechanism through the cached() wrapper method.

Below is a rewrite of the first example above but one of the Mapper's is cached (for the getBillingInfoForCustomers MongoDB call), so on multiple invocations of the defined assembler, the mapper result from the first invocation will be reused, avoiding to hit the datastore again:

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.util.query.MapperUtils.oneToOne;
import static io.github.pellse.util.query.MapperUtils.oneToManyAsList;

import static io.github.pellse.assembler.stream.StreamAdapter.streamAdapter;

import static io.github.pellse.util.query.MapperUtils.cached;

var billingInfoMapper = cached(oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId));
var allOrdersMapper = oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId);

var transactionAssembler = assemblerOf(Transaction.class)
        .withIdExtractor(Customer::getCustomerId)
        .withAssemblerRules(billingInfoMapper, allOrdersMapper, Transaction::new)
        .using(streamAdapter());

var transactionList = transactionAssembler
        .assembleFromSupplier(this::getCustomers)
        .collect(toList()); // Will invoke the getBillingInfoForCustomers() MongoDB remote call

var transactionList2 = transactionAssembler
        .assemble(getCustomers())
        .collect(toList()); // Will reuse the results returned from
                            // the first invocation of getBillingInfoForCustomers() above
                            // if the list of Customer IDs is the same (as defined by the list equals() method)
                            // for both invocations, no remote call here

This can be useful for aggregating dynamic data with static data or data we know doesn't change often (or on a predefined schedule e.g. data that is refreshed by a batch job once a day).

Note that an overloaded version of the cached() method is also defined to allow plugging your own cache implementation.

What's Next?

See the list of issues for planned improvements in a near future.

About

Functional, type-safe and stateless Java API for solving the N + 1 query problem in multi-databases and microservices aggregation

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 100.0%