Skip to content

Commit

Permalink
fix: prevent recursive try-catch memory leak in mergeInternals
Browse files Browse the repository at this point in the history
  • Loading branch information
jonapgar-groupby committed Sep 11, 2023
1 parent 27855c6 commit f051afc
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,16 @@ export function mergeInternals<T, R>(
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;


let projected;
try {
projected = project(value, index++);
} catch (err) {
subscriber.error(err)
return
}
// Start our inner subscription.
innerFrom(project(value, index++)).subscribe(
innerFrom(projected).subscribe(
createOperatorSubscriber(
subscriber,
(innerValue) => {
Expand Down Expand Up @@ -97,35 +104,27 @@ export function mergeInternals<T, R>(
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
subscriber.error(err);
}
// Check to see if we can complete, and complete if so.
checkComplete();
}
}
)
Expand Down

0 comments on commit f051afc

Please sign in to comment.