New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embeddings: Related Discussions, Priority #9563
Comments
the problem with the p+1 prioritization strategy is we'll never start generating retro similarities until ALL the discussions in the whole SaaS are processed. That will tank our QoS as measured by average time to completion. Instead, we want to prioritize by meetingId or discussionId. the goal is to do this without promises that could take up to minutes to resolve, so e.g. having a super job that does everything sequentially isn't ideal. superjobs are also bad because the errors should be tied to a specific metadataId, not a group of them. we also don't want jobs to run longer than a couple seconds so the server stays stateless & can handle restarts easily. ideally, what we want is a way to prioritize these flow jobs after their children, but before other, newer jobs of the same type. we'll need to we may also need a TODO: refId is not globally unique. add a table prefix? |
If I understand this correctly, we only need 2 to 3 different priorities
|
that's true! the only thing missing is a signal to say "generated embeddings for meeting X are done, now calculate similarities". or after that "similarities have been generated, now rerank" My current plan is to signal that with a "generateSimilarities" job that comes after the embeddings have been created. Given we only sort by priority, if it has p-1, then we can't guarantee that all embeddings have been created, but if it has a greater priority than the embeddings then "generateSimilartiies" won't run until there are no more embeddings to process, which could take too long! Open to other suggestions! |
Building this out keeps feeling like it's getting more complex, not less, so I think I'm going down the wrong path. I spent some time looking around for best practices & the newest job queue is called hatchet: https://docs.hatchet.run/home/basics/workflows. Using hatchet would require another docker image & it is built on top of prisma & it has very complex internals: https://github.com/hatchet-dev/hatchet/blob/7ab7290eece1018197d7856740ea46dc7d74eaae/internal/repository/prisma/dbsqlc/step_runs.sql#L420. I think we can get away with keeping the queue in a single table, but we can borrow greatly from the API. The thing I like most about hatchet is the concept of 1 workflow having 1 or many steps. The problem with this is the waterfall nature-- 1 step must be completed until moving onto the next. While we can do 2 jobs within the same step, we can't do 2 distinct steps at once. In other words, we have to wait for all 10 embed jobs to finish until moving onto the next step, even if the next step doesn't depend on those embed jobs! We need a way to support truly directed acyclic graphs. where node B and C both depend on A, and node D depends on both B and C, allowing B & C to run concurrently. Rereading hatchet, I'm not sure if they actually support DAGs because their nodes never bifurcate in the example. so, relatedDiscussions job comes in: job = {
workflow: 'relatedDiscussions',
stepCount: 0,
maxSteps: 1,
steps: [{name: 'start', meetingId: 123}]
} since stepCount < maxSteps, the job gets picked up, the stepCount gets incremented, and the job gets returned to a worker. job = {
workflow: 'relatedDiscussions',
stepCount: 1,
maxSteps: 11,
steps: [{name: 'start', meetingId: 123}, {name: 'embed', discussionId: 'abc'}, {name: 'embed', discussionId: 'def'}...{name: 'embed', discussionId: 'efg', isFinal: true}]
} where If we did have 2 tables, we'd need a workflow queue table & a stepQueue table, which is what hatchet does. I think we can keep 1 table if we keep some rules:
|
@mattkrick safe to close now? |
Our embedder uses a CPU and will take 2-3 seconds per discussion.
This means it probably won't be able to keep up, so we have to prioritize which items we want to process first.
Today, when a meeting ends we ping the embedder & tell it to embed all the discussions.
The related discussions PR pings the embedder when the voting stage ends & embeds all the discussions, then finds similar discussions & embeds those while leaving out the header that describes the meeting O(nm).
To efficiently implement something like this, we can't embed on the fly. We'll want to persist the reranked embeddings so we don't embed the same doc over & over.
We also need a way to support job of jobs. We can't process related discussions until we know all the previous discussions have been embedded. With multiple models, we'll need to query the job queue table to see if any jobs are still outstanding for that team. Do we add
teamId
to the job queue to get this?Flow:
These dependencies between jobs are called job chaining, workflows, or just flows.
This case is somewhat special because we don't know the parent task ahead of time. For example, we could run an ad-hoc embedding job & now any parent job (existing or new) should wait until that child job is finished.
So, we either need a separate job queue, or we need our current job queue to be polymorphic.
In either case, we'll need a state machine and some more states that can handle each state.
A single queue is probably best because then we don't have competing priorities.
New flow:
generateRetroSimilarities
which includes thediscussionId
to use as the comparison. That's kept in thedata
JSONB in the job queue. The priority of this job is p+1, where p is the priority of all the children tasks (default 50, smaller runs first)generateRetroSimilarities
gets picked, it performs the cosine similarity search againstdata.discussionId
. produces a list ofdiscussionIds
. Some may already have rerank embeddings for it, some may not. It checks to see if all the embeddings are available. For each that is not, Anembed
job is created for each with priority = p -1 anddata = {type: 'retrospectiveDiscussionTopic_rerank', force?: false, model: X}
. If all are available, it performs a rerank and publishes.Changes to implement:
queued, running, error
instead ofembedding
.embeddingMetadataId, model
are moved to thedata
JSONB columnjobType
is added. It's a string for now, but enum isembed | generateRetroSimilarities | rerankRetroSimilarities
data
are defined in typescript for each job typeThe text was updated successfully, but these errors were encountered: