Skip to content

Commit

Permalink
@tus/server: introduce POST_RECEIVE_V2 (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
Murderlon committed May 6, 2024
1 parent 86b8b9f commit 60698da
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 25 deletions.
7 changes: 7 additions & 0 deletions .changeset/four-owls-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tus/server': minor
'@tus/utils': minor
---

Introduce POST_RECEIVE_V2 event, which correctly fires during the stream write rather than
after it is finished
1 change: 0 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// eslint-disable-next-line unicorn/prefer-module
module.exports = {
root: true,
// This tells ESLint to load the config from the package `eslint-config-custom`
Expand Down
40 changes: 29 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion packages/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ Max file size (in bytes) allowed when uploading (`number` |
(`(req, id: string | null) => Promise<number> | number`)). When providing a function
during the OPTIONS request the id will be `null`.

#### `options.postReceiveInterval`

Interval in milliseconds for sending progress of an upload over
[`POST_RECEIVE_V2`](#eventspost_receive_v2) (`number`).

#### `options.relativeLocation`

Return a relative URL as the `Location` header to the client (`boolean`).
Expand Down Expand Up @@ -228,14 +233,36 @@ server.on(EVENTS.POST_CREATE, (req, res, upload => {})
#### `POST_RECEIVE`
Called every time a `PATCH` request is handled.
**Deprecated**.
Called every time an upload finished writing to the store. This event is emitted whenever
the request handling is completed (which is the same as `onUploadFinish`, almost the same
as `POST_FINISH`), whereas the `POST_RECEIVE_V2` event is emitted _while_ the request is
being handled.
```js
const {EVENTS} = require('@tus/server')
// ...
server.on(EVENTS.POST_RECEIVE, (req, res, upload => {})
```
#### `POST_RECEIVE_V2`
Called every [`postReceiveInterval`](#optionspostreceiveinterval) milliseconds for every
upload while it‘s being written to the store.
This means you are not guaranteed to get (all) events for an upload. For instance if
`postReceiveInterval` is set to 1000ms and an PATCH request takes 500ms, no event is emitted.
If the PATCH request takes 2500ms, you would get the offset at 2000ms, but not at 2500ms.
Use `POST_FINISH` if you need to know when an upload is done.
```js
const {EVENTS} = require('@tus/server')
// ...
server.on(EVENTS.POST_RECEIVE_V2, (req, upload => {})
```
#### `POST_FINISH`
Called an upload has completed and after a response has been sent to the client.
Expand Down
4 changes: 3 additions & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
},
"dependencies": {
"@tus/utils": "^0.1.0",
"debug": "^4.3.4"
"debug": "^4.3.4",
"lodash.throttle": "^4.1.1"
},
"devDependencies": {
"@types/debug": "^4.1.12",
"@types/lodash.throttle": "^4.1.9",
"@types/mocha": "^10.0.6",
"@types/node": "^20.11.5",
"@types/sinon": "^17.0.3",
Expand Down
28 changes: 21 additions & 7 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import EventEmitter from 'node:events'
import stream from 'node:stream/promises'
import {addAbortSignal, PassThrough} from 'node:stream'
import type http from 'node:http'

import type {ServerOptions} from '../types'
import type {DataStore, CancellationContext} from '@tus/utils'
import type http from 'node:http'
import {ERRORS, Upload, StreamLimiter} from '@tus/utils'
import stream from 'node:stream/promises'
import {addAbortSignal, PassThrough} from 'stream'
import {ERRORS, Upload, StreamLimiter, EVENTS} from '@tus/utils'
import throttle from 'lodash.throttle'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
Expand Down Expand Up @@ -127,8 +128,7 @@ export class BaseHandler extends EventEmitter {

protected writeToStore(
req: http.IncomingMessage,
id: string,
offset: number,
upload: Upload,
maxFileSize: number,
context: CancellationContext
) {
Expand All @@ -149,6 +149,20 @@ export class BaseHandler extends EventEmitter {
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})

const postReceive = throttle(
(offset: number) => {
this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset})
},
this.options.postReceiveInterval,
{leading: false}
)

let tempOffset = upload.offset
proxy.on('data', (chunk: Buffer) => {
tempOffset += chunk.byteLength
postReceive(tempOffset)
})

req.on('error', () => {
if (!proxy.closed) {
// we end the stream gracefully here so that we can upload the remaining bytes to the store
Expand All @@ -162,7 +176,7 @@ export class BaseHandler extends EventEmitter {
// which would result in a socket hangup error for the client.
stream
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
return this.store.write(stream as StreamLimiter, id, offset)
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
})
.then(resolve)
.catch(reject)
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export class PatchHandler extends BaseHandler {
}

const maxBodySize = await this.calculateMaxBodySize(req, upload, maxFileSize)
newOffset = await this.writeToStore(req, id, offset, maxBodySize, context)
newOffset = await this.writeToStore(req, upload, maxBodySize, context)
} finally {
await lock.unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export class PostHandler extends BaseHandler {
// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (validateHeader('content-type', req.headers['content-type'])) {
const bodyMaxSize = await this.calculateMaxBodySize(req, upload, maxFileSize)
const newOffset = await this.writeToStore(req, id, 0, bodyMaxSize, context)
const newOffset = await this.writeToStore(req, upload, bodyMaxSize, context)

headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
Expand Down
6 changes: 6 additions & 0 deletions packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ interface TusEvents {
upload: Upload,
url: string
) => void
/** @deprecated this is almost the same as POST_FINISH, use POST_RECEIVE_V2 instead */
[EVENTS.POST_RECEIVE]: (
req: http.IncomingMessage,
res: http.ServerResponse,
upload: Upload
) => void
[EVENTS.POST_RECEIVE_V2]: (req: http.IncomingMessage, upload: Upload) => void
[EVENTS.POST_FINISH]: (
req: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -96,6 +98,10 @@ export class Server extends EventEmitter {
options.lockDrainTimeout = 3000
}

if (!options.postReceiveInterval) {
options.postReceiveInterval = 1000
}

const {datastore, ...rest} = options
this.options = rest as ServerOptions
this.datastore = datastore
Expand Down
5 changes: 5 additions & 0 deletions packages/server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export type ServerOptions = {
*/
allowedHeaders?: string[]

/**
* Interval in milliseconds for sending progress of an upload over `EVENTS.POST_RECEIVE_V2`
*/
postReceiveInterval?: number

/**
* Control how the upload URL is generated.
* @param req - The incoming HTTP request.
Expand Down
42 changes: 41 additions & 1 deletion packages/server/test/Server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fs from 'node:fs/promises'
import path from 'node:path'

import request from 'supertest'
import Throttle from 'throttle'

import {Server} from '../src'
import {FileStore} from '@tus/file-store'
Expand Down Expand Up @@ -311,7 +312,6 @@ describe('Server', () => {
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Length', length)
.then((res) => {
console.log(res.headers.location)
request(s)
.patch(removeProtocol(res.headers.location))
.send('test')
Expand Down Expand Up @@ -381,6 +381,46 @@ describe('Server', () => {
})
})

it('should receive throttled POST_RECEIVE event', (done) => {
const server = new Server({
path: '/test/output',
datastore: new FileStore({directory}),
postReceiveInterval: 500,
})
const size = 1024 * 1024
let received = 0
server.on(EVENTS.POST_RECEIVE_V2, () => {
received++
})

const originalWrite = server.datastore.write.bind(server.datastore)
// Slow down writing
sinon.stub(server.datastore, 'write').callsFake((stream, ...args) => {
// bytes per second a bit slower than exactly 2s so we can test getting four events
const throttleStream = new Throttle({bps: size / 2 - size / 10})
return originalWrite(stream.pipe(throttleStream), ...args)
})

const data = Buffer.alloc(size, 'a')

request(server.listen())
.post(server.options.path)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Length', data.byteLength.toString())
.then((res) => {
request(server.listen())
.patch(removeProtocol(res.headers.location))
.send(data)
.set('Tus-Resumable', TUS_RESUMABLE)
.set('Upload-Offset', '0')
.set('Content-Type', 'application/offset+octet-stream')
.end((err) => {
assert.equal(received, 4)
done(err)
})
})
})

it('should fire when an upload is finished', (done) => {
const length = Buffer.byteLength('test', 'utf8').toString()
server.on(EVENTS.POST_FINISH, (req, res, upload) => {
Expand Down
11 changes: 10 additions & 1 deletion packages/utils/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,19 @@ export const ERRORS = {
} as const

export const POST_CREATE = 'POST_CREATE' as const
/** @deprecated this is almost the same as POST_FINISH, use POST_RECEIVE_V2 instead */
export const POST_RECEIVE = 'POST_RECEIVE' as const
export const POST_RECEIVE_V2 = 'POST_RECEIVE_V2' as const
export const POST_FINISH = 'POST_FINISH' as const
export const POST_TERMINATE = 'POST_TERMINATE' as const
export const EVENTS = {POST_CREATE, POST_RECEIVE, POST_FINISH, POST_TERMINATE} as const
export const EVENTS = {
POST_CREATE,
/** @deprecated this is almost the same as POST_FINISH, use POST_RECEIVE_V2 instead */
POST_RECEIVE,
POST_RECEIVE_V2,
POST_FINISH,
POST_TERMINATE,
} as const

export const MAX_AGE = 86_400 as const
export const TUS_RESUMABLE = '1.0.0' as const
Expand Down

0 comments on commit 60698da

Please sign in to comment.