Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support http stream response #2322

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

yejialiango
Copy link
Contributor

@yejialiango yejialiango commented May 30, 2023

Issues associated with this PR

#2320

Solutions

You should show your solutions about the issues in your PR, including the overall solutions, details and the changes. At this time, Chinese is allowed to describe these.

UT result

Unit Test is needed if the code is changed, your unit test should cover boundary cases, corner cases, and some exceptional cases. And you need to show the UT result.

Benchmark

If your code involves the processing of every request, you should give the Benchmark Result.

Code Style

  • Make sure Goimports has run
  • Show Golint result

@codecov
Copy link

codecov bot commented May 30, 2023

Codecov Report

Patch coverage: 5.92% and project coverage change: -0.41 ⚠️

Comparison is base (7d48a20) 60.45% compared to head (0760783) 60.04%.

❗ Current head 0760783 differs from pull request most recent head f149d8e. Consider uploading reports for the commit f149d8e to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2322      +/-   ##
==========================================
- Coverage   60.45%   60.04%   -0.41%     
==========================================
  Files         422      422              
  Lines       37507    37784     +277     
==========================================
+ Hits        22673    22687      +14     
- Misses      12597    12859     +262     
- Partials     2237     2238       +1     
Impacted Files Coverage Δ
pkg/metrics/upstream.go 0.00% <ø> (ø)
pkg/proxy/var.go 37.86% <ø> (ø)
pkg/stream/client.go 0.00% <0.00%> (ø)
pkg/stream/http/var.go 90.76% <ø> (ø)
pkg/types/upstream.go 60.71% <ø> (ø)
pkg/types/variable.go 58.82% <ø> (ø)
pkg/stream/http/stream.go 23.55% <1.92%> (-11.68%) ⬇️
pkg/proxy/downstream.go 57.42% <22.22%> (-0.39%) ⬇️
pkg/upstream/cluster/loadbalancer.go 83.33% <77.77%> (-0.18%) ⬇️
pkg/upstream/cluster/stats.go 93.54% <100.00%> (+0.32%) ⬆️

... and 8 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

feat: support http1 stream response

feat: support http1 stream response

feat: support http1 stream response

feat: support http1 stream response

style: simplify code

style: simplify code

style: simplify code

style: simplify code

feat: support http1 stream response
@jizhuozhi jizhuozhi added kind/enhancement improvement for exist feature, not bug area/proxy area/stream labels Jun 7, 2023
@huanglu20124
Copy link

我试了合并了一下这部分代码,测试了下载大文件的场景,发现内存占用过高的问题依然存在。流式传输的代码是参考fasthttp的,fasthttp实现了分段的读以及发,但它还是用了一个buffer缓存整个响应body,这样还是避免不了内存占用高的问题。我感觉handleChunk那里还可以优化一下,每次读取一段数据时,写到一个临时buffer里,这个buffer只缓存本次读取的数据,写到输出流以后就可以不管这些数据了

Copy link
Contributor

@jizhuozhi jizhuozhi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe fasthttp need the dst to hold the full http response body, but for our sence we are not focusing on the full http response body, so the buffer is just used to copy some bytes from upstream to mosn, and to write to the downstream.


// copy from fasthttp github.com/valyala/fasthttp@v1.40.0/http.go:1374
func writeBodyToPipe(r *bufio.Reader, maxBodySize int, contentLength int, writeFunc func([]byte) error) (err error) {
buf := make([]byte, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perf: give a initial capacity to avoid frequent growth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

// copy from fasthttp github.com/valyala/fasthttp@v1.40.0/http.go:1374
func writeBodyToPipe(r *bufio.Reader, maxBodySize int, contentLength int, writeFunc func([]byte) error) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxBodySize seems always 0, do we need this argument?

return dst, nil
}

offset := len(dst)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory: why should we always append to the end of buffer but not start with 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.I will reevaluate and optimize this.

@huanglu20124
Copy link

各位大佬好,我还发现另外一个问题,clientStreamConnection对象将响应body传给serverStream对象的过程中,使用到了mosn的pipeBuffer,clientStreamConnection相当于消费者,serverStream相当于生产者,实测mosn作为入流量的sidecar时,会出现生产速率远大于消费速率的情况(因为本地通信比远程通信快得多),这时候就会导致pipeBuffer不断扩容,最终出现占用内存过大的情况。

@yejialiango
Copy link
Contributor Author

各位大佬好,我还发现另外一个问题,clientStreamConnection对象将响应body传给serverStream对象的过程中,使用到了mosn的pipeBuffer,clientStreamConnection相当于消费者,serverStream相当于生产者,实测mosn作为入流量的sidecar时,会出现生产速率远大于消费速率的情况(因为本地通信比远程通信快得多),这时候就会导致pipeBuffer不断扩容,最终出现占用内存过大的情况。

估计是有两个因素导致的:

  1. 初始化pipebuffer容量写了0,导致频繁扩容
  2. 当前是读完一堆长度为content-length的数据才往pipebuffer写

这里我试下把writeBodyFixedSize内部回写的方式改一下,改成固定大小流式回写

@huanglu20124
Copy link

多谢答复~,不过这两种手段我都试过了,能起到一定优化效果,但没有根本解决pipebuffer扩容的问题。我们这边的场景是用mosn作为sidecar,代理入流量,下载大文件的时候,还是很容易出现pipeBuffer增长过快的问题

}
data := buf[:n]
log.Proxy.Debugf(s.ctx, "[stream] [http] [stream response] receive data: %s", string(data))
if err = s.connection.conn.Write(buffer.NewIoBufferBytes(data)); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这段代码是从pipeBuffer里读取数据并返回给客户端,如果客户端是远端机器的话,这里可能会耗时较久,导致pipeBuffer消费速度远慢于生产速度,然后pipeBuffer会出现增长过快的情况。我的想法是这里能否控制clientStream里写pipeBuffer的速度,或者干脆不使用pipeBuffer,将数据同步方式从异步改为同步,clientStream收到包直接交给ServerStream返回

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解了下,这里面实际上是需要backpressure(背压)能力吧?当消费速率下降时主动降低生产速率

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯是的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里感觉还是要考虑下场景:

  1. 如果改成同步模式,数据是堆积在与upstream的连接中的
  2. 如果保持现有的方式,使用pipebuffer传输数据,数据是有可能堆积在mosn内部的
    @jizhuozhi 大佬有什么好的建议吗?

Copy link
Contributor

@jizhuozhi jizhuozhi Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

刚刚看了下fasthttp,在valyala/fasthttp#911 中已经有合入过流式处理的功能了,所以看下能不能直接在这个功能基础上进行增强

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多谢,我看看

@yejialiango
Copy link
Contributor Author

这里可以用fasthttp的SetBodyStream搞,我重写下

@adamzhoul
Copy link

反馈一个问题:

这里wrap了一下OnReceive
实际上,会先调用w.stream.DestroyStream()。 这里会将client(conn) 放回到connPool
然后在调用w.streamReceiver.OnReceive ,将数据返回。

mosn/pkg/stream/client.go

Lines 177 to 180 in cafbd5c

func (w *clientStreamReceiverWrapper) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {
w.stream.DestroyStream()
w.streamReceiver.OnReceive(ctx, headers, data, trailers)
}

这就导致一个问题。当有新的请求进来的时候。由于stream返回还没完成。所以会卡在 serve上一次for循环中

func (conn *clientStreamConnection) serve() {
   for {
      sendStreamResponse(s)  // sendStreamResponse
   }
}

导致,新的请求,从连接池获取到conn,发送完数据。但是迟迟没有开始处理response。

@taoyuanyuan
Copy link
Contributor

filter怎么支持stream,也是一个问题~

@yejialiango
Copy link
Contributor Author

这里我抽空看看怎么处理

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/proxy area/stream kind/enhancement improvement for exist feature, not bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants