Skip to content

Commit

Permalink
refactor: post announce logic to save to separate zset instead of to …
Browse files Browse the repository at this point in the history
…topic events, closes #12536
  • Loading branch information
julianlam committed May 1, 2024
1 parent 5e20319 commit 119800d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 28 deletions.
30 changes: 2 additions & 28 deletions src/activitypub/inbox.js
Expand Up @@ -173,23 +173,7 @@ inbox.announce = async (req) => {
winston.info(`[activitypub/inbox/announce] Parsing id ${pid}`);

if (!cid) { // Topic events from actors followed by users only
// No double-announce allowed
const existing = await topics.events.find(tid, {
type: 'announce',
uid: actor,
pid,
});
if (existing.length) {
await topics.events.purge(tid, existing);
}

await topics.events.log(tid, {
type: 'announce',
uid: actor,
href: `/post/${encodeURIComponent(pid)}`,
pid,
timestamp,
});
await activitypub.notes.announce.add(pid, actor, timestamp);
}
};

Expand Down Expand Up @@ -389,17 +373,7 @@ inbox.undo = async (req) => {
winston.verbose(`[activitypub/inbox/undo] Attempted to undo announce of ${id} but couldn't find it, so doing nothing.`);
}

const tid = await posts.getPostField(id, 'tid');
const existing = await topics.events.find(tid, {
type: 'announce',
uid: actor,
pid: id,
});

if (existing.length) {
await topics.events.purge(tid, existing);
}

await activitypub.notes.announce.remove(id, actor);
notifications.rescind(`announce:post:${id}:uid:${actor}`);
break;
}
Expand Down
46 changes: 46 additions & 0 deletions src/activitypub/notes.js
Expand Up @@ -304,3 +304,49 @@ Notes.getCategoryFollowers = async (cid) => {

return uids;
};

Notes.announce = {};

Notes.announce.list = async ({ pid, tid }) => {
let pids = [];
if (pid) {
pids = [pid];
} else if (tid) {
let mainPid;
([pids, mainPid] = await Promise.all([
db.getSortedSetMembers(`tid:${tid}:posts`),
topics.getTopicField(tid, 'mainPid'),
]));
pids.unshift(mainPid);
}

if (!pids.length) {
return [];
}

const keys = pids.map(pid => `pid:${pid}:announces`);
let announces = await db.getSortedSetsMembersWithScores(keys);
announces = announces.reduce((memo, cur, idx) => {
if (cur.length) {
const pid = pids[idx];
cur.forEach(({ value: actor, score: timestamp }) => {
memo.push({ pid, actor, timestamp });
});
}
return memo;
}, []);

return announces;
};

Notes.announce.add = async (pid, actor, timestamp = Date.now()) => {
await db.sortedSetAdd(`pid:${pid}:announces`, timestamp, actor);
};

Notes.announce.remove = async (pid, actor) => {
await db.sortedSetRemove(`pid:${pid}:announces`, actor);
};

Notes.announce.removeAll = async (pid) => {
await db.delete(`pid:${pid}:announces`);
};
14 changes: 14 additions & 0 deletions src/topics/events.js
Expand Up @@ -10,6 +10,7 @@ const categories = require('../categories');
const plugins = require('../plugins');
const translator = require('../translator');
const privileges = require('../privileges');
const activitypub = require('../activitypub');
const utils = require('../utils');
const helpers = require('../helpers');

Expand Down Expand Up @@ -176,6 +177,19 @@ async function modifyEvent({ tid, uid, eventIds, timestamps, events }) {
});
}

// Add post announces
const announces = await activitypub.notes.announce.list({ tid });
announces.forEach(({ actor, pid, timestamp }) => {
events.push({
type: 'announce',
uid: actor,
href: `/post/${encodeURIComponent(pid)}`,
pid,
timestamp,
});
timestamps.push(timestamp);
});

const [users, fromCategories, userSettings] = await Promise.all([
getUserInfo(events.map(event => event.uid).filter(Boolean)),
getCategoryInfo(events.map(event => event.fromCid).filter(Boolean)),
Expand Down
38 changes: 38 additions & 0 deletions src/upgrades/4.0.0/announces_zset.js
@@ -0,0 +1,38 @@
'use strict';

const db = require('../../database');
const batch = require('../../batch');
const topics = require('../../topics');

module.exports = {
name: 'Save ActivityPub Announces in their own per-post sorted set',
timestamp: Date.UTC(2024, 4, 1),
method: async function () {
const { progress } = this;
const bulkOp = [];

await batch.processSortedSet('topics:tid', async (tids, next) => {
await Promise.all(tids.map(async (tid) => {
const announces = await topics.events.find(tid, {
type: 'announce',
});

if (announces.length) {
await Promise.all(announces.map(async (eid) => {
const event = await db.getObject(`topicEvent:${eid}`);
if (['uid', 'pid', 'timestamp'].every(prop => event.hasOwnProperty(prop))) {
bulkOp.push([`pid:${event.pid}:announces`, event.timestamp, event.uid]);
}
}));

console.log('piurging', tid);
await topics.events.purge(tid, announces);
}
}));

progress.incr(tids.length);
}, { progress });

await db.sortedSetAddBulk(bulkOp);
},
};

0 comments on commit 119800d

Please sign in to comment.