-
Notifications
You must be signed in to change notification settings - Fork 798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support http stream response #2322
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #2322 +/- ##
==========================================
- Coverage 60.45% 60.04% -0.41%
==========================================
Files 422 422
Lines 37507 37784 +277
==========================================
+ Hits 22673 22687 +14
- Misses 12597 12859 +262
- Partials 2237 2238 +1
☔ View full report in Codecov by Sentry. |
feat: support http1 stream response feat: support http1 stream response feat: support http1 stream response feat: support http1 stream response style: simplify code style: simplify code style: simplify code style: simplify code feat: support http1 stream response
我试了合并了一下这部分代码,测试了下载大文件的场景,发现内存占用过高的问题依然存在。流式传输的代码是参考fasthttp的,fasthttp实现了分段的读以及发,但它还是用了一个buffer缓存整个响应body,这样还是避免不了内存占用高的问题。我感觉handleChunk那里还可以优化一下,每次读取一段数据时,写到一个临时buffer里,这个buffer只缓存本次读取的数据,写到输出流以后就可以不管这些数据了 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe fasthttp
need the dst
to hold the full http response body, but for our sence we are not focusing on the full http response body, so the buffer is just used to copy some bytes from upstream to mosn, and to write to the downstream.
|
||
// copy from fasthttp github.com/valyala/fasthttp@v1.40.0/http.go:1374 | ||
func writeBodyToPipe(r *bufio.Reader, maxBodySize int, contentLength int, writeFunc func([]byte) error) (err error) { | ||
buf := make([]byte, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perf: give a initial capacity to avoid frequent growth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
} | ||
|
||
// copy from fasthttp github.com/valyala/fasthttp@v1.40.0/http.go:1374 | ||
func writeBodyToPipe(r *bufio.Reader, maxBodySize int, contentLength int, writeFunc func([]byte) error) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxBodySize
seems always 0
, do we need this argument?
return dst, nil | ||
} | ||
|
||
offset := len(dst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory: why should we always append to the end of buffer but not start with 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.I will reevaluate and optimize this.
各位大佬好,我还发现另外一个问题,clientStreamConnection对象将响应body传给serverStream对象的过程中,使用到了mosn的pipeBuffer,clientStreamConnection相当于消费者,serverStream相当于生产者,实测mosn作为入流量的sidecar时,会出现生产速率远大于消费速率的情况(因为本地通信比远程通信快得多),这时候就会导致pipeBuffer不断扩容,最终出现占用内存过大的情况。 |
估计是有两个因素导致的:
这里我试下把writeBodyFixedSize内部回写的方式改一下,改成固定大小流式回写 |
多谢答复~,不过这两种手段我都试过了,能起到一定优化效果,但没有根本解决pipebuffer扩容的问题。我们这边的场景是用mosn作为sidecar,代理入流量,下载大文件的时候,还是很容易出现pipeBuffer增长过快的问题 |
} | ||
data := buf[:n] | ||
log.Proxy.Debugf(s.ctx, "[stream] [http] [stream response] receive data: %s", string(data)) | ||
if err = s.connection.conn.Write(buffer.NewIoBufferBytes(data)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这段代码是从pipeBuffer里读取数据并返回给客户端,如果客户端是远端机器的话,这里可能会耗时较久,导致pipeBuffer消费速度远慢于生产速度,然后pipeBuffer会出现增长过快的情况。我的想法是这里能否控制clientStream里写pipeBuffer的速度,或者干脆不使用pipeBuffer,将数据同步方式从异步改为同步,clientStream收到包直接交给ServerStream返回
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我理解了下,这里面实际上是需要backpressure(背压)能力吧?当消费速率下降时主动降低生产速率
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯嗯是的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里感觉还是要考虑下场景:
- 如果改成同步模式,数据是堆积在与upstream的连接中的
- 如果保持现有的方式,使用pipebuffer传输数据,数据是有可能堆积在mosn内部的
@jizhuozhi 大佬有什么好的建议吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
刚刚看了下fasthttp,在valyala/fasthttp#911 中已经有合入过流式处理的功能了,所以看下能不能直接在这个功能基础上进行增强
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
多谢,我看看
这里可以用fasthttp的SetBodyStream搞,我重写下 |
反馈一个问题: 这里wrap了一下OnReceive Lines 177 to 180 in cafbd5c
这就导致一个问题。当有新的请求进来的时候。由于stream返回还没完成。所以会卡在 serve上一次for循环中
导致,新的请求,从连接池获取到conn,发送完数据。但是迟迟没有开始处理response。 |
filter怎么支持stream,也是一个问题~ |
这里我抽空看看怎么处理 |
Issues associated with this PR
#2320
Solutions
You should show your solutions about the issues in your PR, including the overall solutions, details and the changes. At this time, Chinese is allowed to describe these.
UT result
Unit Test is needed if the code is changed, your unit test should cover boundary cases, corner cases, and some exceptional cases. And you need to show the UT result.
Benchmark
If your code involves the processing of every request, you should give the Benchmark Result.
Code Style
Goimports
has runGolint
result