From: Tony Tkacik Date: Mon, 8 Dec 2014 09:08:11 +0000 (+0000) Subject: Merge "Fix improper cleanup of operational data in sal-netconf-connector's disconnect" X-Git-Tag: release/lithium~791 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=93154ea2573eb37ebcca4c360bf538d8d92967f9;hp=4b65b104779988ca197bbf8797afdc3e9b1e6ee4 Merge "Fix improper cleanup of operational data in sal-netconf-connector's disconnect" --- diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index 6f43768a9d..6159922183 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -52,10 +52,6 @@ org.opendaylight.controller sal-core-api - - org.opendaylight.controller - sal-core-api - org.opendaylight.controller sal-core-spi diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java index 47e96d1ff4..af196a941a 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java @@ -37,6 +37,12 @@ public abstract class AbstractProtocolSession extends SimpleChannelInboundHan public final void channelInactive(final ChannelHandlerContext ctx) { LOG.debug("Channel {} inactive.", ctx.channel()); endOfInput(); + try { + // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close channel handler of reconnect promise + super.channelInactive(ctx); + } catch (final Exception e) { + throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e); + } } @Override diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index b2ab27a826..aaec95a74b 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -47,13 +47,12 @@ final class ReconnectPromise, L extends SessionList pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer() { @Override public void initializeChannel(final SocketChannel channel, final Promise promise) { - // add closed channel handler - // This handler has to be added before initializer.initializeChannel is called - // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case - // closed channel handler is before the handler that invokes channel inactive event - channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this)); - initializer.initializeChannel(channel, promise); + // add closed channel handler + // This handler has to be added as last channel handler and the channel inactive event has to be caught by it + // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work + // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started + channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this)); } }); } @@ -91,9 +90,7 @@ final class ReconnectPromise, L extends SessionList @Override public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - // Pass info about disconnect further and then reconnect - super.channelInactive(ctx); - + // This is the ultimate channel inactive handler, not forwarding if (promise.isCancelled()) { return; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index bd42a9cdd9..cbaf278a87 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -112,11 +112,6 @@ yang-common - - org.opendaylight.yangtools - yang-data-api - - org.osgi org.osgi.core diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java index 395e5c0338..0d296c5f52 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java @@ -98,6 +98,7 @@ public class NetconfMonitoringServiceImplTest { NetconfServerSessionListener sessionListener = mock(NetconfServerSessionListener.class); Channel channel = mock(Channel.class); + doReturn("mockChannel").when(channel).toString(); NetconfHelloMessageAdditionalHeader header = new NetconfHelloMessageAdditionalHeader("name", "addr", "2", "tcp", "id"); NetconfServerSession sm = new NetconfServerSession(sessionListener, channel, 10, header); doNothing().when(sessionListener).onSessionUp(any(NetconfServerSession.class)); diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultStopExiTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultStopExiTest.java index c06e78aa99..aaaf5991d4 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultStopExiTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultStopExiTest.java @@ -31,6 +31,7 @@ public class DefaultStopExiTest { DefaultStopExi exi = new DefaultStopExi(""); Document doc = XmlUtil.newDocument(); Channel channel = mock(Channel.class); + doReturn("mockChannel").when(channel).toString(); ChannelPipeline pipeline = mock(ChannelPipeline.class); doReturn(pipeline).when(channel).pipeline(); ChannelHandler channelHandler = mock(ChannelHandler.class); diff --git a/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivatorTest.java b/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivatorTest.java index 6664f3e733..792a591512 100644 --- a/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivatorTest.java +++ b/opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivatorTest.java @@ -40,6 +40,7 @@ public class NetconfMonitoringActivatorTest { ServiceReference[] refs = new ServiceReference[2]; doReturn(Arrays.asList(refs)).when(context).getServiceReferences(any(Class.class), anyString()); doReturn(refs).when(context).getServiceReferences(anyString(), anyString()); + doNothing().when(context).removeServiceListener(any(ServiceListener.class)); } @Test diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java index efa1c731c8..fd11ce8c51 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java @@ -87,6 +87,7 @@ public abstract class AbstractNetconfSession chunkSize) { - ByteBuf chunk = Unpooled.buffer(chunkSize); - chunk.writeBytes(createChunkHeader(chunkSize)); - chunk.writeBytes(msg.readBytes(chunkSize)); - ctx.write(chunk); - } - out.writeBytes(createChunkHeader(msg.readableBytes())); - out.writeBytes(msg.readBytes(msg.readableBytes())); - out.writeBytes(NetconfMessageConstants.END_OF_CHUNK); - } + protected void encode(final ChannelHandlerContext ctx, final ByteBuf msg, final ByteBuf out) { + do { + final int xfer = Math.min(chunkSize, msg.readableBytes()); + + out.writeBytes(NetconfMessageConstants.START_OF_CHUNK); + out.writeBytes(String.valueOf(xfer).getBytes(Charsets.US_ASCII)); + out.writeByte('\n'); - private ByteBuf createChunkHeader(int chunkSize) { - return Unpooled.wrappedBuffer(NetconfMessageHeader.toBytes(chunkSize)); + out.writeBytes(msg, xfer); + } while (msg.isReadable()); + + out.writeBytes(NetconfMessageConstants.END_OF_CHUNK); } } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index c8c9128282..05cd598cdc 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -176,6 +176,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { + LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise); this.connectPromise = promise; startSsh(ctx, remoteAddress); } @@ -187,23 +188,21 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { - // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources - // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430) - // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation - try { - super.disconnect(ctx, ctx.newPromise()); - } catch (final Exception e) { - LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e); - } + LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(), connectPromise); - if(sshReadAsyncListener != null) { - sshReadAsyncListener.close(); + // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic + if(connectPromise.isSuccess()) { + ctx.fireChannelInactive(); } if(sshWriteAsyncHandler != null) { sshWriteAsyncHandler.close(); } + if(sshReadAsyncListener != null) { + sshReadAsyncListener.close(); + } + if(session!= null && !session.isClosed() && !session.isClosing()) { session.close(false).addListener(new SshFutureListener() { @Override @@ -216,13 +215,17 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { }); } - // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic - if(connectPromise.isSuccess()) { - ctx.fireChannelInactive(); + // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources + // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430) + // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation + try { + // Disconnect has to be closed after inactive channel event was fired, because it interferes with it + super.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e); } channel = null; - promise.setSuccess(); LOG.debug("SSH session closed on channel: {}", ctx.channel()); } diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java index 8199963c81..7946afdbf5 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java @@ -60,6 +60,7 @@ public class AbstractNetconfSessionTest { doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class)); doReturn(pipeline).when(channel).pipeline(); + doReturn("mockChannel").when(channel).toString(); doReturn(mock(ChannelFuture.class)).when(channel).close(); doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class)); diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java index 556bece43f..4488c5e1be 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ChunkedFramingMechanismEncoderTest.java @@ -9,20 +9,16 @@ package org.opendaylight.controller.netconf.nettyutil.handler; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertTrue; +import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import java.util.List; +import java.nio.ByteBuffer; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; public class ChunkedFramingMechanismEncoderTest { @@ -48,30 +44,20 @@ public class ChunkedFramingMechanismEncoderTest { @Test public void testEncode() throws Exception { - final List chunks = Lists.newArrayList(); - doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - chunks.add((ByteBuf) invocation.getArguments()[0]); - return null; - } - }).when(ctx).write(anyObject()); - final ChunkedFramingMechanismEncoder encoder = new ChunkedFramingMechanismEncoder(chunkSize); final int lastChunkSize = 20; final ByteBuf src = Unpooled.wrappedBuffer(getByteArray(chunkSize * 4 + lastChunkSize)); final ByteBuf destination = Unpooled.buffer(); encoder.encode(ctx, src, destination); - assertEquals(4, chunks.size()); - final int framingSize = "#256\n".getBytes().length + 1/* new line at end */; + assertEquals(1077, destination.readableBytes()); - for (final ByteBuf chunk : chunks) { - assertEquals(chunkSize + framingSize, chunk.readableBytes()); - } + byte[] buf = new byte[destination.readableBytes()]; + destination.readBytes(buf); + String s = Charsets.US_ASCII.decode(ByteBuffer.wrap(buf)).toString(); - final int lastFramingSize = "#20\n".length() + NetconfMessageConstants.END_OF_CHUNK.length + 1/* new line at end */; - assertEquals(lastChunkSize + lastFramingSize, destination.readableBytes()); + assertTrue(s.startsWith("\n#256\na")); + assertTrue(s.endsWith("\n#20\naaaaaaaaaaaaaaaaaaaa\n##\n")); } private byte[] getByteArray(final int size) { diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java index 5c2770a8c1..89285d18c0 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java @@ -27,6 +27,7 @@ public final class NetconfMessageConstants { public static final int MAX_HEADER_LENGTH = 13; + public static final byte[] START_OF_CHUNK = "\n#".getBytes(Charsets.UTF_8); public static final byte[] END_OF_CHUNK = "\n##\n".getBytes(Charsets.UTF_8); }