Netty最简单的代理转发

/ 后端 / 没有评论 / 255浏览

客户端 --> netty服务器 --> 目标服务器 (发送数据)

客户端 <-- netty服务器 <-- 目标服务器 (响应数据)

package com.kakuiwong.handle;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EasyProxyHandle extends ChannelInboundHandlerAdapter {

    private ChannelFuture cf;
    private String host;
    private int port;

    public void start(Integer port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast("httpCodec", new HttpServerCodec());
                            ch.pipeline().addLast("httpObject", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("serverHandle", new EasyProxyHandle());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            http(ctx, msg);
        } else {
            https(ctx, msg);
        }
    }

    private void https(ChannelHandlerContext ctx, Object msg) {
        if (cf == null) {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx0, Object msg) throws Exception {
                                    ctx.channel().writeAndFlush(msg);
                                }
                            });
                        }
                    });
            cf = bootstrap.connect(host, port);
            cf.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    future.channel().writeAndFlush(msg);
                } else {
                    ctx.channel().close();
                }
            });
        } else {
            cf.channel().writeAndFlush(msg);
        }
    }

    private void http(ChannelHandlerContext ctx, Object msg) {
        FullHttpRequest request = (FullHttpRequest) msg;
        String[] split = request.headers().get("host").split(":");

        //判断端口号
        int port = 80;
        if (split.length > 1) {
            port = Integer.parseInt(split[1]);
        } else {
            if (request.uri().indexOf("https") == 0) {
                port = 443;
            }
        }

        this.host = split[0];
        this.port = port;
        //HTTPS建立代理握手
        if ("CONNECT".equalsIgnoreCase(request.method().name())) {
            HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            ctx.writeAndFlush(response);
            ctx.pipeline().remove("httpCodec");
            ctx.pipeline().remove("httpObject");
            return;
        }
        //连接至目标服务器
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(ctx.channel().eventLoop())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new HttpClientCodec());
                        ch.pipeline().addLast(new HttpObjectAggregator(6553600));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                                FullHttpResponse response = (FullHttpResponse) msg;
                                //修改http响应体返回至客户端
                                response.headers().add("proxy", "来自代理服务器");
                                ctx.channel().writeAndFlush(msg);
                            }
                        });
                    }
                });

        ChannelFuture cf = bootstrap.connect(split[0], port);
        cf.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                future.channel().writeAndFlush(request);
            } else {
                ctx.channel().close();
            }
        });
    }
}