Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepkit Broker #492

Open
5 of 12 tasks
marcj opened this issue Oct 22, 2023 · 0 comments
Open
5 of 12 tasks

Deepkit Broker #492

marcj opened this issue Oct 22, 2023 · 0 comments
Milestone

Comments

@marcj
Copy link
Member

marcj commented Oct 22, 2023

Deepkit Broker has been around for quiet some time, but played a niche role. Mainly in the Debugger GUI (profiler) to collect data from all workers and publish it to the currently connected client. It brought beside application lock also cache + pub/sub functionality.

However, we need a more robust implementation. Seeing other frameworks having proper caching and queue libraries, we need to provide that too. I plan to provide that under Deepkit Broker:

  • Distributed application lock (critical section lock)
  • Distributed typesafe Caching with Stampede prevention
  • Distributed typesafe message bus (pub/sub)
  • Distributed typesafe queue

"Distributed" because it should use consistent hashing to distribute load across multiple servers. Typesafe because it always automatically serializes and deserialises with validation.

Deepkit Broker is the foundation for other features like Deepkit Dashboard (monitoring, profiling, etc), a new Job API (distributed job execution),
and an upcoming distribution configuration service that allows for example to apply server settings (like adding a new Node to the caching pool) atomically. It plays also a important role in Deepkit/RPCs peer2peer communication (as it acts as the message router)

The implementation is based on an adapter pattern where Deepkit Broker Server is the first adapter (that is faster than Redis for most use-cases and easy to deploy). Support for Redis & AWS follow after first release.

Examples

Application Lock

const lock = broker.lock('locky-xy');

await lock.acquire(); //blocks until acquired
try {
     // do critical stuff
} finally {
    await lock.release();
}

Caching

const cache = broker.cache('cache-xy');

type CacheEntry = {
    value: string;
};

const result = await cache.get('my-key', async (): Promise<CacheEntry> => {
    // this function is only called if the key is not in the cache
    return {value: 'my-value'};
});

Decoupling cache building and safe cache paths:

type User = { id: number, username: string, created: Date };

type UserCache = BrokerCacheKey<User, 'user/:id', { id: number }>;
broker.provideCache<UserCache>((parameters) => {
    return { id: parameters.id, username: 'peter', created: new Date };
});

const userCache = broker.cache<UserCache>();
const entry: User = await userCache.get({ id: 2 });

Message Bus

type Events = { type: 'user-created', id: number } | { type: 'user-deleted', id: number };

const channel = broker.bus<Events>('/events/user');

await channel.subscribe((event) => {
    expect(event).toEqual({ type: 'user-created', id: 2 });
});

await channel.publish({ type: 'user-created', id: 2 });

Message Queue

type User = { id: number, username: string };
type QueueA = BrokerQueueChannel<User, 'user/registered'>;

const queue = broker.queue<QueueA>();

await queue.consume(async (message) => {
    console.log(message);
    resolve(message.data);
});

await queue.produce({ id: 3, username: 'peter' });

Todo

  • Application Lock

  • Caching

  • Message Bus

  • Message Queue

    • message.release(seconds)
    • message.fail(error)
    • timeout
    • debugger gui
  • Server Persistence (snapshotting) using fork

  • Consistent Hashing

  • CLI to start server

  • CLI to start consumer

@marcj marcj added this to the Beta milestone Oct 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant