Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about 06-Coding-with-Streams Multiplexing #48

Open
luokuning opened this issue Dec 2, 2021 · 0 comments
Open

Question about 06-Coding-with-Streams Multiplexing #48

luokuning opened this issue Dec 2, 2021 · 0 comments

Comments

@luokuning
Copy link

Hi there, I'm reading multiplexing in Chapter 6, and I ran into a problem that I can't figure out.
First I copied the contents of the client.js and server.js into my local file, but instead use file read/write stream rather than stdout or stderr, and the code runs smooth without any problem. The complete code is shown below:

// client.mjs
import { connect } from 'net'
import { createReadStream } from 'fs'

function multiplexChannels (sources, destination) {
  let openChannels = sources.length
  for (let i = 0; i < sources.length; i++) {
    sources[i]
    .on('readable', function () { // ①
      let chunk
      while ((chunk = this.read()) !== null) {
        const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ②
        outBuff.writeUInt8(i, 0)
        outBuff.writeUInt32BE(chunk.length, 1)
        chunk.copy(outBuff, 5)
        console.log(`Sending packet to channel: ${i}`)
        destination.write(outBuff) // ③
      }
    })
    .on('end', () => { // ④
      if (--openChannels === 0) {
        destination.end()
      }
    })
  }
}

const socket = connect(3000, '127.0.0.1', () => {
+  const sourceFiles = [
+   createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
+ ]

  multiplexChannels(sourceFiles, socket)

})
// server.mjs
import { createWriteStream } from 'fs'
import { createServer } from 'net'
import { resolve } from 'path'

function demultiplexChannel (source, destinations) {
  let currentChannel = null
  let currentLength = null

  source
    .on('readable', () => { // ①
      let chunk
      if (currentChannel === null) { // ②
        chunk = source.read(1)
        currentChannel = chunk && chunk.readUInt8(0)
      }

      if (currentLength === null) { // ③
        chunk = source.read(4)
        currentLength = chunk && chunk.readUInt32BE(0)
        if (currentLength === null) {
          return null
        }
      }

      chunk = source.read(currentLength) // ④
      if (chunk === null) {
        return null
      }

      console.log(`Received packet from: ${currentChannel}`)
      destinations[currentChannel].write(chunk) // ⑤
      currentChannel = null
      currentLength = null
    })
    .on('end', () => { // ⑥
      destinations.forEach(destination => destination.end())
      console.log('Source channel closed')
    })
}

const server = createServer(socket => {
+  const destFiles = [
+   createWriteStream('/tmp/filetcp/1.txt'),
+   createWriteStream('/tmp/filetcp/2.txt'),
+  ]

  demultiplexChannel(socket, destFiles)
})
server.listen(3000, () => console.log('Server started'))

But when I add highWaterMark option to file read stream of client.mjs like below:

-const sourceFiles = [
-    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
-    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
-  ]
+ const sourceFiles = [
+    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs', { highWaterMark: 8 }),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs', { highWaterMark: 8 }),
+  ]

and run client.mjs again, something strange happened: both client and server socket hang forever. And the content of the server-generated file is incomplete, eg. 1.txt has partial content of client.mjs in this case.

I know that hightWaterMark is the maximum number of bytes of the internal buffer, but I can't figure out this has something to do with the problem, is there something that I missing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant