Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embeddings: Related Discussions, Priority #9563

Open
5 of 6 tasks
mattkrick opened this issue Mar 25, 2024 · 5 comments
Open
5 of 6 tasks

Embeddings: Related Discussions, Priority #9563

mattkrick opened this issue Mar 25, 2024 · 5 comments

Comments

@mattkrick
Copy link
Member

mattkrick commented Mar 25, 2024

Our embedder uses a CPU and will take 2-3 seconds per discussion.
This means it probably won't be able to keep up, so we have to prioritize which items we want to process first.

Today, when a meeting ends we ping the embedder & tell it to embed all the discussions.
The related discussions PR pings the embedder when the voting stage ends & embeds all the discussions, then finds similar discussions & embeds those while leaving out the header that describes the meeting O(nm).

To efficiently implement something like this, we can't embed on the fly. We'll want to persist the reranked embeddings so we don't embed the same doc over & over.

We also need a way to support job of jobs. We can't process related discussions until we know all the previous discussions have been embedded. With multiple models, we'll need to query the job queue table to see if any jobs are still outstanding for that team. Do we add teamId to the job queue to get this?

Flow:

  • When the voting stage ends, embedder receives a message to process all discussions for that given meetingId
  • Once all those discussions have been embedded (how to detect?) we perform a cosine similarity search on each and get related discussions
  • Once we get related discussions, we embed each of those using the rerank algorithm
  • Once all of those discussions have been embedded, we perform a cosine similarity search to get the new order, generate some text and publish it to the channel

These dependencies between jobs are called job chaining, workflows, or just flows.
This case is somewhat special because we don't know the parent task ahead of time. For example, we could run an ad-hoc embedding job & now any parent job (existing or new) should wait until that child job is finished.

So, we either need a separate job queue, or we need our current job queue to be polymorphic.
In either case, we'll need a state machine and some more states that can handle each state.
A single queue is probably best because then we don't have competing priorities.

New flow:

  • When the voting stage ends, embedder receives a message to process all discussion for that given meetingId. After it's done adding the metadata, which triggers job queue items, it adds a final job queue item called generateRetroSimilarities which includes the discussionId to use as the comparison. That's kept in the data JSONB in the job queue. The priority of this job is p+1, where p is the priority of all the children tasks (default 50, smaller runs first)
  • when generateRetroSimilarities gets picked, it performs the cosine similarity search against data.discussionId. produces a list of discussionIds. Some may already have rerank embeddings for it, some may not. It checks to see if all the embeddings are available. For each that is not, An embed job is created for each with priority = p -1 and data = {type: 'retrospectiveDiscussionTopic_rerank', force?: false, model: X}. If all are available, it performs a rerank and publishes.
  • When an embed job runs, if force is false & the data already exists in the model table, the job is complete.

Changes to implement:

  • JobQueue table enum is queued, running, error instead of embedding.
  • Job Queue columns embeddingMetadataId, model are moved to the data JSONB column
  • Job Queue column jobType is added. It's a string for now, but enum is embed | generateRetroSimilarities | rerankRetroSimilarities
  • Job Queue is ordered by priority ascending
  • processor is broken down by jobType
  • types for data are defined in typescript for each job type
@mattkrick
Copy link
Member Author

mattkrick commented Apr 2, 2024

the problem with the p+1 prioritization strategy is we'll never start generating retro similarities until ALL the discussions in the whole SaaS are processed. That will tank our QoS as measured by average time to completion. Instead, we want to prioritize by meetingId or discussionId.

the goal is to do this without promises that could take up to minutes to resolve, so e.g. having a super job that does everything sequentially isn't ideal. superjobs are also bad because the errors should be tied to a specific metadataId, not a group of them. we also don't want jobs to run longer than a couple seconds so the server stays stateless & can handle restarts easily.

ideally, what we want is a way to prioritize these flow jobs after their children, but before other, newer jobs of the same type.
so, we'll need a 2nd sort order:

we'll need to orderBy(priority asc, flowStartAt asc). then, for the aggregate job, we just need to make sure it's at least 1ms later than the children to ensure that all children have been processed.

we may also need a flowId attached to the children in case 1 errors we can error the whole flow, but for related discussions, if 1 errors we can just ignore that.

TODO: refId is not globally unique. add a table prefix?

@Dschoordsch
Copy link
Contributor

If I understand this correctly, we only need 2 to 3 different priorities

  • p+1: old discussion backlog
  • p: embeddings generated when a meeting ends (if these weren't already generated)
  • p-1: adhoc embedding when searching for similarities

@mattkrick
Copy link
Member Author

that's true! the only thing missing is a signal to say "generated embeddings for meeting X are done, now calculate similarities". or after that "similarities have been generated, now rerank"

My current plan is to signal that with a "generateSimilarities" job that comes after the embeddings have been created. Given we only sort by priority, if it has p-1, then we can't guarantee that all embeddings have been created, but if it has a greater priority than the embeddings then "generateSimilartiies" won't run until there are no more embeddings to process, which could take too long! Open to other suggestions!

@mattkrick
Copy link
Member Author

mattkrick commented Apr 3, 2024

Building this out keeps feeling like it's getting more complex, not less, so I think I'm going down the wrong path.
The problem arises with the interdependencies between jobs & how 1 job failure should trigger a failure for all future jobs within the same workflow.

I spent some time looking around for best practices & the newest job queue is called hatchet: https://docs.hatchet.run/home/basics/workflows.

Using hatchet would require another docker image & it is built on top of prisma & it has very complex internals: https://github.com/hatchet-dev/hatchet/blob/7ab7290eece1018197d7856740ea46dc7d74eaae/internal/repository/prisma/dbsqlc/step_runs.sql#L420. I think we can get away with keeping the queue in a single table, but we can borrow greatly from the API.

The thing I like most about hatchet is the concept of 1 workflow having 1 or many steps.
To emulate this, we can have each row in the job queue be a workflow & the status of the job queue could be the step that it's on.
Our use case is a little unique because some steps lead to n next steps: e.g. a "relatedDiscussions:start" step could lead to 10 "embed" processes, 1 for each discussion. How we could approach this is by having each workflow have a column called maxWorkersForStep and currentWorkersForStep. if we are on the embed step, which has 10 jobs to do, we could include all the discussionIds in the context. Then, for each worker that picks up that job, we increment the currentWorkersForStep until it equals maxWorkersForStep.

The problem with this is the waterfall nature-- 1 step must be completed until moving onto the next. While we can do 2 jobs within the same step, we can't do 2 distinct steps at once. In other words, we have to wait for all 10 embed jobs to finish until moving onto the next step, even if the next step doesn't depend on those embed jobs! We need a way to support truly directed acyclic graphs. where node B and C both depend on A, and node D depends on both B and C, allowing B & C to run concurrently.

Rereading hatchet, I'm not sure if they actually support DAGs because their nodes never bifurcate in the example.
If we did want to support it, and only use 1 table, we'd need each step to return an array of children steps and then have a stepCounter instead of a currentWorkers where the stepCounter pointed to an index in an array of steps.

so, relatedDiscussions job comes in:

job = {
  workflow: 'relatedDiscussions',
  stepCount: 0,
  maxSteps: 1,
  steps: [{name: 'start', meetingId: 123}]
}

since stepCount < maxSteps, the job gets picked up, the stepCount gets incremented, and the job gets returned to a worker.
the worker picks steps stepCount - 1 and performs the job using the worker for start with meetingId as the context.
The output from start updates the job row to the following

job = {
  workflow: 'relatedDiscussions',
  stepCount: 1,
  maxSteps: 11,
  steps: [{name: 'start', meetingId: 123}, {name: 'embed', discussionId: 'abc'}, {name: 'embed', discussionId: 'def'}...{name: 'embed', discussionId: 'efg', isFinal: true}]
}

where maxSteps === steps.length.
so, the workers go along, picking this workflow until stepCount == maxSteps. when the worker with isFinal === true completes it's job, it adds the getSimilarities job. Ideally, we don't want this logic to exist in the worker callback itself, but rather in an orchestrator. The problem is that what if each of n embed jobs results in n getSimilarities jobs? does it matter that all the embeds have to finish before the first getSimilarities can start? Ideally, we process each discussion to completion so the user gets feedback ASAP. Since we can't splice that job into the steps array, we'll need to push it, but assign a priority to it that is the same priority of its parent step... this is looking like a 2nd DB table 😦 If we do that, then we may as well ditch the workflows table & just have a steps table, which is what we do today. the problem is how we fail jobs & propagate that failure to future jobs.

If we did have 2 tables, we'd need a workflow queue table & a stepQueue table, which is what hatchet does.
after picking a workflow and incrementing the stepCount, we'd have to pick a step from the step queue table, based on priority. we probably need some concept of parent/child relationship to exist to know which jobs can start immediately & which ones need to wait for something else.

I think we can keep 1 table if we keep some rules:

  • when the step bifurcates, i.e. the step produces more than 1 succeeding steps that can be run in parallel, then each bifurcation becomes a new flow with its own flowId
  • when steps converge, e.g. Z depends on both B and C, then the nearest parent must create that node with a waitOn array that includes the IDs of its children. For example, A would create Z at the same time as B,C. Z would have a waitOn: [B, C] value in its row. If B,C each go through their own steps creating B->D->F and C->E->G, then at each step Z.waitOn will be updated by removing the parent and replacing it with the child or children. for example, in step B Z.waitOn was [B,C] but is now [D,C]. If there is a bifurcation, it would be replace itself with the IDs of all children. The last child removes itself, e.g. at step F Z.waitOn = [F,C]. when F completes, Z.waitOn becomes [C]. F then checks the return value of Z.waitOn. If it is an empty array, then the Z.priority = F.priority, causing it to run next. A workflow has many steps, each step has a stepRun and a stepFInish. the stepRun can be shared across workflows, but the stepFinish, which handles convergence, is unique to the workflow, which makes sharing steps across workflows easier.
  • convergence is hard! let's try not to have any of those

@jordanh
Copy link
Contributor

jordanh commented May 20, 2024

@mattkrick safe to close now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: To triage
Development

No branches or pull requests

3 participants