Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDS support for R2 HTTP client #836

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and what APIs have changed, if applicable.

## [Unreleased]


## [29.39.4-rc.1] - 2022-09-13
- Add Support for UDS transport protocol in R2 outbound traffic over HTTP1.1

## [29.39.3] - 2022-09-26
Catch exceptions when zk connection state change event is received after zk connection shutdown.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.r2.netty.handler.common.SslHandshakeTimingHandler;
import com.linkedin.r2.netty.handler.http.HttpMessageDecoders;
import com.linkedin.r2.netty.handler.http.HttpMessageEncoders;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
Expand Down Expand Up @@ -60,7 +61,7 @@
* @author Sean Sheng
* @author Nizar Mankulangara
*/
class HttpChannelInitializer extends ChannelInitializer<NioSocketChannel>
class HttpChannelInitializer extends ChannelInitializer<Channel>
{
/**
* HTTP/2 stream channels are not recyclable and should be disposed upon completion.
Expand Down Expand Up @@ -93,7 +94,7 @@ class HttpChannelInitializer extends ChannelInitializer<NioSocketChannel>
}

@Override
protected void initChannel(NioSocketChannel channel)
protected void initChannel(Channel channel)
{
if (_ssl)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import org.apache.commons.lang3.StringUtils;


/**
* Factory class to produce {@link AsyncPool}&#060;{@link Channel}&#062; for Http Channels
Expand All @@ -53,7 +56,9 @@ public class HttpChannelPoolFactory implements ChannelPoolFactory
private final ScheduledExecutorService _scheduler;
private final AsyncPoolImpl.Strategy _strategy;
private int _channelPoolWaiterTimeout;
private final String _udsAddress;

@Deprecated
public HttpChannelPoolFactory(
ScheduledExecutorService scheduler,
EventLoopGroup eventLoopGroup,
Expand All @@ -76,7 +81,36 @@ public HttpChannelPoolFactory(
int connectTimeout,
int sslHandShakeTimeout)
{
ChannelInitializer<NioSocketChannel> initializer = new HttpChannelInitializer(sslContext, sslParameters,
this( scheduler, eventLoopGroup, channelGroup, strategy, sslContext, sslParameters, maxPoolSize,
minPoolSize, maxPoolWaiterSize, maxInitialLineLength, maxHeaderSize, maxChunkSize,
maxConcurrentConnectionInitializations, idleTimeout, maxContentLength, tcpNoDelay, enableSSLSessionResumption,
channelPoolWaiterTimeout, connectTimeout, sslHandShakeTimeout, null);
}

public HttpChannelPoolFactory(
ScheduledExecutorService scheduler,
EventLoopGroup eventLoopGroup,
ChannelGroup channelGroup,
AsyncPoolImpl.Strategy strategy,
SSLContext sslContext,
SSLParameters sslParameters,
int maxPoolSize,
int minPoolSize,
int maxPoolWaiterSize,
int maxInitialLineLength,
int maxHeaderSize,
int maxChunkSize,
int maxConcurrentConnectionInitializations,
long idleTimeout,
long maxContentLength,
boolean tcpNoDelay,
boolean enableSSLSessionResumption,
int channelPoolWaiterTimeout,
int connectTimeout,
int sslHandShakeTimeout,
String udsAddress)
{
ChannelInitializer<Channel> initializer = new HttpChannelInitializer(sslContext, sslParameters,
maxInitialLineLength, maxHeaderSize, maxChunkSize, maxContentLength, enableSSLSessionResumption, sslHandShakeTimeout);

_scheduler = scheduler;
Expand All @@ -89,9 +123,22 @@ public HttpChannelPoolFactory(
_idleTimeout = idleTimeout;
_tcpNoDelay = tcpNoDelay;
_channelPoolWaiterTimeout = channelPoolWaiterTimeout;
_udsAddress = udsAddress;

_bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class).
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout).handler(initializer);
if (!StringUtils.isEmpty(_udsAddress)) {
_bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(EpollDomainSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(initializer);
}
else{
_bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(initializer);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public ChannelPoolManager buildStream(ChannelPoolManagerKey channelPoolManagerKe
_enableSSLSessionResumption,
_channelPoolWaiterTimeout,
_connectTimeout,
_sslHandShakeTimeout);
_sslHandShakeTimeout,
channelPoolManagerKey.getUdsAddress());
}
else
{
Expand Down