Skip to content

Commit

Permalink
Apply collection level namespace config to dedicated channels
Browse files Browse the repository at this point in the history
  • Loading branch information
mfen committed Mar 25, 2022
1 parent b2be29d commit 852c172
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
10 changes: 5 additions & 5 deletions lib/mongo/Mutator.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export default class Mutator {
}

dispatchInsert(
config.optimistic,
config,
this._name,
config._channels,
doc
Expand Down Expand Up @@ -196,7 +196,7 @@ export default class Mutator {
const { fields } = getFields(modifier);

dispatchUpdate(
config.optimistic,
config,
this._name,
config._channels,
docs,
Expand Down Expand Up @@ -259,7 +259,7 @@ export default class Mutator {
}

dispatchInsert(
config.optimistic,
config,
this._name,
config._channels,
doc
Expand All @@ -284,7 +284,7 @@ export default class Mutator {
docs = this.find(selector).fetch();

dispatchUpdate(
config.optimistic,
config,
this._name,
config._channels,
docs,
Expand Down Expand Up @@ -358,7 +358,7 @@ export default class Mutator {
}

dispatchRemove(
config.optimistic,
config,
this._name,
config._channels,
docs
Expand Down
26 changes: 14 additions & 12 deletions lib/mongo/lib/dispatchers.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel';
import Config from '../../config';
import OptimisticInvocation from '../OptimisticInvocation';

const dispatchEvents = function(optimistic, collectionName, channels, events) {
const dispatchEvents = function(config, collectionName, channels, events) {
const { optimistic } = config;
if (optimistic) {
OptimisticInvocation.withValue(true, () => {
events.forEach(event => {
const docId = event[RedisPipe.DOC]._id;
const dedicatedChannel = getDedicatedChannel(
collectionName,
docId
);
const dedicatedChannel = getDedicatedChannel(collectionName, docId, config);
RedisSubscriptionManager.process(dedicatedChannel, event);

channels.forEach(channelName => {
Expand All @@ -37,20 +35,22 @@ const dispatchEvents = function(optimistic, collectionName, channels, events) {
channels.forEach(channelName => {
client.publish(channelName, message);
});

const docId = event[RedisPipe.DOC]._id;
const dedicatedChannel = getDedicatedChannel(collectionName, docId);
const dedicatedChannel = getDedicatedChannel(collectionName, docId, config);
client.publish(dedicatedChannel, message);
});
});
};

const dispatchUpdate = function(
optimistic,
config,
collectionName,
channels,
docs,
fields
) {
const { optimistic } = config;
const uid = optimistic ? RedisSubscriptionManager.uid : null;

const events = docs.map(doc => ({
Expand All @@ -60,10 +60,11 @@ const dispatchUpdate = function(
[RedisPipe.UID]: uid,
}));

dispatchEvents(optimistic, collectionName, channels, events);
dispatchEvents(config, collectionName, channels, events);
};

const dispatchRemove = function(optimistic, collectionName, channels, docs) {
const dispatchRemove = function(config, collectionName, channels, docs) {
const { optimistic } = config;
const uid = optimistic ? RedisSubscriptionManager.uid : null;

const events = docs.map(doc => ({
Expand All @@ -72,10 +73,11 @@ const dispatchRemove = function(optimistic, collectionName, channels, docs) {
[RedisPipe.UID]: uid,
}));

dispatchEvents(optimistic, collectionName, channels, events);
dispatchEvents(config, collectionName, channels, events);
};

const dispatchInsert = function(optimistic, collectionName, channels, doc) {
const dispatchInsert = function(config, collectionName, channels, doc) {
const { optimistic } = config;
const uid = optimistic ? RedisSubscriptionManager.uid : null;

const event = {
Expand All @@ -84,7 +86,7 @@ const dispatchInsert = function(optimistic, collectionName, channels, doc) {
[RedisPipe.UID]: uid,
};

dispatchEvents(optimistic, collectionName, channels, [event]);
dispatchEvents(config, collectionName, channels, [event]);
};

export { dispatchInsert, dispatchUpdate, dispatchRemove };
2 changes: 1 addition & 1 deletion lib/redis/RedisSubscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default class RedisSubscriber {
this.observableCollection.selector
);

return ids.map(id => getDedicatedChannel(collectionName, id));
return ids.map(id => getDedicatedChannel(collectionName, id, this.observableCollection.options));
default:
throw new Meteor.Error(
`Strategy could not be found: ${this.strategy}`
Expand Down
5 changes: 3 additions & 2 deletions lib/utils/getDedicatedChannel.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { MongoID } from 'meteor/mongo-id';
import getChannelName from './getChannelName';

export default function getDedicatedChannel(collectionName, docId){
const channelName = `${collectionName}::${MongoID.idStringify(docId)}`;
// TODO do we want to support the multiple `namespaces` config for dedicated channels?
export default function getDedicatedChannel(collectionName, docId, { namespace }){
const channelName = `${namespace ? (namespace + '::') : ''}${collectionName}::${MongoID.idStringify(docId)}`;
return getChannelName(channelName);
}

0 comments on commit 852c172

Please sign in to comment.