Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/reduce number of docs synced bulk #2396

Merged
merged 11 commits into from
May 10, 2024
22 changes: 8 additions & 14 deletions services/libs/opensearch/src/service/member.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ export class MemberSyncService {

public async syncMembers(memberIds: string[], segmentIds?: string[]): Promise<IMemberSyncResult> {
const CONCURRENT_DATABASE_QUERIES = 10
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 100

// get all memberId-segmentId couples
const memberSegmentCouples: IMemberSegmentMatrix =
Expand Down Expand Up @@ -399,7 +399,9 @@ export class MemberSyncService {
})

// databaseStreams will create syncStreams items in processSegmentsStream, which'll later be used to sync to opensearch in bulk
if (databaseStream.length >= CONCURRENT_DATABASE_QUERIES) {
const isLastSegment = i === totalMemberIds - 1 && j === totalSegments - 1

if (isLastSegment || databaseStream.length >= CONCURRENT_DATABASE_QUERIES) {
await processSegmentsStream(databaseStream)
databaseStream = []
}
Expand All @@ -413,18 +415,10 @@ export class MemberSyncService {
syncStream = syncStream.slice(BULK_INDEX_DOCUMENT_BATCH_SIZE)
}

// if we're processing the last segment
if (i === totalMemberIds - 1 && j === totalSegments - 1) {
// check if there are remaining databaseStreams to process
if (databaseStream.length > 0) {
await processSegmentsStream(databaseStream)
}

// check if there are remaining syncStreams to process
if (syncStream.length > 0) {
await this.openSearchService.bulkIndex(OpenSearchIndex.MEMBERS, syncStream)
documentsIndexed += syncStream.length
}
// check if there are remaining syncStreams to process
if (isLastSegment && syncStream.length > 0) {
await this.openSearchService.bulkIndex(OpenSearchIndex.MEMBERS, syncStream)
documentsIndexed += syncStream.length
}
successfullySyncedMembers.push(memberId)
}
Expand Down
22 changes: 8 additions & 14 deletions services/libs/opensearch/src/service/organization.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ export class OrganizationSyncService {
segmentIds?: string[],
): Promise<IOrganizationSyncResult> {
const CONCURRENT_DATABASE_QUERIES = 5
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 100

// get all orgId-segmentId couples
const orgSegmentCouples: IOrganizationSegmentMatrix =
Expand Down Expand Up @@ -360,7 +360,9 @@ export class OrganizationSyncService {
})

// databaseStreams will create syncStreams items in processSegmentsStream, which'll later be used to sync to opensearch in bulk
if (databaseStream.length >= CONCURRENT_DATABASE_QUERIES) {
const isLastSegment = i === totalOrgIds - 1 && j === totalSegments - 1

if (isLastSegment || databaseStream.length >= CONCURRENT_DATABASE_QUERIES) {
await processSegmentsStream(databaseStream)
databaseStream = []
}
Expand All @@ -374,18 +376,10 @@ export class OrganizationSyncService {
syncStream = syncStream.slice(BULK_INDEX_DOCUMENT_BATCH_SIZE)
}

// if we're processing the last segment
if (i === totalOrgIds - 1 && j === totalSegments - 1) {
// check if there are remaining databaseStreams to process
if (databaseStream.length > 0) {
await processSegmentsStream(databaseStream)
}

// check if there are remaining syncStreams to process
if (syncStream.length > 0) {
await this.openSearchService.bulkIndex(OpenSearchIndex.ORGANIZATIONS, syncStream)
documentsIndexed += syncStream.length
}
// check if there are remaining syncStreams to process
if (isLastSegment && syncStream.length > 0) {
await this.openSearchService.bulkIndex(OpenSearchIndex.ORGANIZATIONS, syncStream)
documentsIndexed += syncStream.length
}
}
}
Expand Down