Fix channelInactive event handling in the netty pipeline for netconf. 17/13417/1
authorMaros Marsalek <mmarsale@cisco.com>
Fri, 5 Dec 2014 13:28:31 +0000 (14:28 +0100)
committerMaros Marsalek <mmarsale@cisco.com>
Fri, 5 Dec 2014 13:59:14 +0000 (14:59 +0100)
Reoreding events from AsyncSshHandler.
+ Moving ClosedChannelHandler from Reconnect promise at the end of the pipeline.
+ Add channel id to toString() of abstract netconf session

Change-Id: I9884c2b8d8b2d89878e2fe5cb43fda7a98be5b23
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultStopExiTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java

index 47e96d1ff490df392accfb84dc10d55eb84a4583..af196a941a777df18eacc9d2929a3048e57677ab 100644 (file)
@@ -37,6 +37,12 @@ public abstract class AbstractProtocolSession<M> extends SimpleChannelInboundHan
     public final void channelInactive(final ChannelHandlerContext ctx) {
         LOG.debug("Channel {} inactive.", ctx.channel());
         endOfInput();
+        try {
+            // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close channel handler of reconnect promise
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
     }
 
     @Override
index b2ab27a82671cdc0c2380ac8f1084e540ee61691..aaec95a74b5886150c8d0bc4bb7d11a1a1570dbf 100644 (file)
@@ -47,13 +47,12 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
         pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                // add closed channel handler
-                // This handler has to be added before initializer.initializeChannel is called
-                // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
-                // closed channel handler is before the handler that invokes channel inactive event
-                channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
-
                 initializer.initializeChannel(channel, promise);
+                // add closed channel handler
+                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+                channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
             }
         });
     }
@@ -91,9 +90,7 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
 
         @Override
         public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-            // Pass info about disconnect further and then reconnect
-            super.channelInactive(ctx);
-
+            // This is the ultimate channel inactive handler, not forwarding
             if (promise.isCancelled()) {
                 return;
             }
index 395e5c03384abad0e988789d5bf5d42f8db42d9c..0d296c5f5269c0662f9b27db9d6ef1973038d8c8 100644 (file)
@@ -98,6 +98,7 @@ public class NetconfMonitoringServiceImplTest {
 
         NetconfServerSessionListener sessionListener = mock(NetconfServerSessionListener.class);
         Channel channel = mock(Channel.class);
+        doReturn("mockChannel").when(channel).toString();
         NetconfHelloMessageAdditionalHeader header = new NetconfHelloMessageAdditionalHeader("name", "addr", "2", "tcp", "id");
         NetconfServerSession sm = new NetconfServerSession(sessionListener, channel, 10, header);
         doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class));
index c06e78aa997cfaafdc5dd2624d784a33bb047364..aaaf5991d425d762ab1afa8303ae69d86c651c36 100644 (file)
@@ -31,6 +31,7 @@ public class DefaultStopExiTest {
         DefaultStopExi exi = new DefaultStopExi("");
         Document doc = XmlUtil.newDocument();
         Channel channel = mock(Channel.class);
+        doReturn("mockChannel").when(channel).toString();
         ChannelPipeline pipeline = mock(ChannelPipeline.class);
         doReturn(pipeline).when(channel).pipeline();
         ChannelHandler channelHandler = mock(ChannelHandler.class);
index efa1c731c8a8b239f18dd68d850aff3914bf7b62..fd11ce8c51875e379976917fd722ce6c5797d0ec 100644 (file)
@@ -87,6 +87,7 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
     public String toString() {
         final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
         sb.append("sessionId=").append(sessionId);
+        sb.append(", channel=").append(channel);
         sb.append('}');
         return sb.toString();
     }
index c8c912828279e72eab9ac12dba25e2a29e2c10d4..05cd598cdc22f7b1265c661c447b850cab1a0256 100644 (file)
@@ -176,6 +176,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
+        LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
         this.connectPromise = promise;
         startSsh(ctx, remoteAddress);
     }
@@ -187,23 +188,21 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
-        // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources
-        // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
-        // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation
-        try {
-            super.disconnect(ctx, ctx.newPromise());
-        } catch (final Exception e) {
-            LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
-        }
+        LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(), connectPromise);
 
-        if(sshReadAsyncListener != null) {
-            sshReadAsyncListener.close();
+        // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+        if(connectPromise.isSuccess()) {
+            ctx.fireChannelInactive();
         }
 
         if(sshWriteAsyncHandler != null) {
             sshWriteAsyncHandler.close();
         }
 
+        if(sshReadAsyncListener != null) {
+            sshReadAsyncListener.close();
+        }
+
         if(session!= null && !session.isClosed() && !session.isClosing()) {
             session.close(false).addListener(new SshFutureListener<CloseFuture>() {
                 @Override
@@ -216,13 +215,17 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             });
         }
 
-        // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
-        if(connectPromise.isSuccess()) {
-            ctx.fireChannelInactive();
+        // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources
+        // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
+        // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation
+        try {
+            // Disconnect has to be closed after inactive channel event was fired, because it interferes with it
+            super.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
         }
 
         channel = null;
-
         promise.setSuccess();
         LOG.debug("SSH session closed on channel: {}", ctx.channel());
     }
index 8199963c8112d8e708c3d4076c23eada3e3604fe..7946afdbf5e1c80add226b08137b1faedd3bf6bf 100644 (file)
@@ -60,6 +60,7 @@ public class AbstractNetconfSessionTest {
 
         doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
         doReturn(pipeline).when(channel).pipeline();
+        doReturn("mockChannel").when(channel).toString();
         doReturn(mock(ChannelFuture.class)).when(channel).close();
 
         doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));