Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If I want a transform stream to be able to be used as just a writer as well as a true transform, how do I stop pushing data to a possibly non-existant downstream? #4340

Open
1mike12 opened this issue Feb 7, 2024 · 0 comments
Labels

Comments

@1mike12
Copy link

1mike12 commented Feb 7, 2024

Details

So I have a custom transform stream that basically just lets me accumulate data from upstream, and run a async task when it's hit a predefined batch limit. The use case is to stream files and batch insert into a database.

What I understand is that we can use a transform stream either as a true transformer, or just as a writer AKA the end of the stream pipeline.

However, if we want a stream that can behave correctly in either mode, does this just work automatically? If I push to the read side buffer but there is nothing attached on the read side, wouldn't the read side buffer of my transform grow forever? I don't get it. I can't find any conversation about this anywhere online.

Node.js version

Not applicable.

Example code

import { Transform, TransformCallback } from "stream"

export class BatchStream extends Transform {
  private readonly batchSize: number
  private readonly asyncProcessFunction: (batch: any[]) => Promise<any>
  private batch: any[]

  constructor(batchSize: number, asyncProcessFunction: (batch: any[]) => Promise<any>) {
    super({ objectMode: true })
    this.batchSize = batchSize
    this.asyncProcessFunction = asyncProcessFunction
    this.batch = []
  }

  _transform(chunk: any, encoding: string, callback: TransformCallback): void {
    this.batch.push(chunk)

    if (this.batch.length >= this.batchSize) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  _flush(callback: TransformCallback): void {
    if (this.batch.length > 0) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  private async processBatch(callback: TransformCallback): Promise<void> {
    try {
      await this.asyncProcessFunction(this.batch)
      this.batch = []
      callback()
    } catch (err) {
      callback(err)
    }
  }
}

Operating system

n/a

Scope

code

Module and version

streams

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants