添加URL
分享

I try to create a proxy and then pass all the trafic other handlers in Netty. I understand I should manage the references to ByteBuf but I cannot understand how to do it. My example and the exception is below.

Initializer:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
    private final SslContext sslCtx;
    private final String remoteHost;
    private final int remotePort;
    public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = sslCtx;
    public HexDumpProxyInitializer(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = null;
    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
        p.addLast(new InboundPrinterHandler());

HexDumpProxyFrontendHandler

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
    private final String remoteHost;
    private final int remotePort;
    private Channel outboundChannel;
    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();
        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
         .channel(ctx.channel().getClass())
         .handler(new HexDumpProxyBackendHandler(inboundChannel))
         .option(ChannelOption.AUTO_READ, false);
        ChannelFuture f = b.connect(remoteHost, remotePort);
        outboundChannel = f.channel();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
        ctx.fireChannelRead(msg);
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
   static void closeOnFlush(Channel ch) {
       if (ch.isActive()) {
           ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

InboundPrinterHandler

public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf bb = null;
        bb = (ByteBuf) msg;
        System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
        System.out.println("\n\n\n");
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

The Exception

io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
    at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
    at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        // Snip
    ctx.fireChannelRead(msg);

After you pass away the ByteBuf to the other channel, it is the other channels responsibility to decrement the refcount again. Because the other channel has now decremented the the refcount, it is now unuseable.

The best way to solve this is manually incrementing the value before you pass the traffic to the other channel using .retain():

outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() {
// Your remaining code
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.

site design / logo © 2020 Stack Exchange Inc; user contributions licensed under cc by-sa 4.0 with attribution required. rev 2020.2.13.36069