From 1d364bf55905f619173d65a4823c6d594c5f29fe Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 9 Jul 2014 10:05:23 +0200 Subject: [PATCH] BUG-190 Simplify reconnect logic in protocol-framework. - Removed duplicate code from ReconnectPromise (almost the same code in ProtocolSessionPromise) - ReconnectPromise now only calls ProtocolSessionPromise.connect (once right away to connect initially, then every time channel is closed by dropped session) - Removed second ReconnectingStrategyFactory from ReconnectPromise and AbstractDispatcher.createReconnectingClient (Kept in AbstractDispatcher for backwards compatibility as deprecated). Only one factory needed for reconnecting client - Added unit tests to ServerTest to test different cases of reconnecting - Slightly refactored ProtocolSessionPromise to make code more readable Change-Id: If3af8f468e7d59822c984cf814d15460ab35921f Signed-off-by: Maros Marsalek --- .../commons/protocol-framework/pom.xml | 5 + .../framework/AbstractDispatcher.java | 90 ++++-- .../framework/ProtocolSessionPromise.java | 151 +++++----- .../protocol/framework/ReconnectPromise.java | 178 ++++-------- .../protocol/framework/SessionListener.java | 2 +- .../protocol/framework/ServerTest.java | 271 +++++++++++++++--- .../protocol/framework/SimpleDispatcher.java | 4 + 7 files changed, 442 insertions(+), 259 deletions(-) diff --git a/opendaylight/commons/protocol-framework/pom.xml b/opendaylight/commons/protocol-framework/pom.xml index f70698731a..774bc7c23f 100644 --- a/opendaylight/commons/protocol-framework/pom.xml +++ b/opendaylight/commons/protocol-framework/pom.xml @@ -91,6 +91,11 @@ netty-event-executor-config test + + ch.qos.logback + logback-classic + test + diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index a62bd7da06..a05d02cd09 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -7,12 +7,19 @@ */ package org.opendaylight.protocol.framework; +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -28,13 +35,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import java.io.Closeable; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the * start method that will handle sockets in different thread. @@ -155,7 +155,7 @@ public abstract class AbstractDispatcher, L extends */ protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer initializer) { final Bootstrap b = new Bootstrap(); - final ProtocolSessionPromise p = new ProtocolSessionPromise(executor, address, strategy, b); + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, b); b.option(ChannelOption.SO_KEEPALIVE, true).handler( new ChannelInitializer() { @Override @@ -165,18 +165,36 @@ public abstract class AbstractDispatcher, L extends }); customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + + p.connect(); + LOG.debug("Client created."); + return p; + } + private void setWorkerGroup(final Bootstrap b) { if (b.group() == null) { b.group(workerGroup); } + } - // There is no way to detect if this was already set by - // customizeBootstrap() - try { - b.channel(NioSocketChannel.class); - } catch (IllegalStateException e) { - LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); - } + /** + * Create a client but use a pre-configured bootstrap. + * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved. + * + * @param address remote address + */ + protected Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap, final PipelineInitializer initializer) { + final ProtocolSessionPromise p = new ProtocolSessionPromise<>(executor, address, strategy, bootstrap); + + bootstrap.handler( + new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel ch) { + initializer.initializeChannel(ch, p); + } + }); p.connect(); LOG.debug("Client created."); @@ -195,6 +213,9 @@ public abstract class AbstractDispatcher, L extends } /** + * + * @deprecated use {@link org.opendaylight.protocol.framework.AbstractDispatcher#createReconnectingClient(java.net.InetSocketAddress, ReconnectStrategyFactory, org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer)} with only one reconnectStrategyFactory instead. + * * Creates a client. * * @param address remote address @@ -204,15 +225,47 @@ public abstract class AbstractDispatcher, L extends * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. * success if it indicates no further attempts should be made and failure if it reports an error */ + @Deprecated protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, final PipelineInitializer initializer) { + return createReconnectingClient(address, connectStrategyFactory, initializer); + } - final ReconnectPromise p = new ReconnectPromise(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer); - p.connect(); + /** + * Creates a reconnecting client. + * + * @param address remote address + * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt + * + * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. + * success if it indicates no further attempts should be made and failure if it reports an error + */ + protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, + final PipelineInitializer initializer) { + final Bootstrap b = new Bootstrap(); + + final ReconnectPromise p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer); + + b.option(ChannelOption.SO_KEEPALIVE, true); + customizeBootstrap(b); + setWorkerGroup(b); + setChannelFactory(b); + + p.connect(); return p; } + private void setChannelFactory(final Bootstrap b) { + // There is no way to detect if this was already set by + // customizeBootstrap() + try { + b.channel(NioSocketChannel.class); + } catch (final IllegalStateException e) { + LOG.trace("Not overriding channelFactory on bootstrap {}", b, e); + } + } + /** * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()} */ @@ -225,5 +278,4 @@ public abstract class AbstractDispatcher, L extends this.bossGroup.shutdownGracefully(); } } - } diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java index a78274cca0..a38db61ead 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java @@ -7,6 +7,7 @@ */ package org.opendaylight.protocol.framework; +import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -16,17 +17,12 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; - import java.net.InetSocketAddress; - import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - @ThreadSafe final class ProtocolSessionPromise> extends DefaultPromise { private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class); @@ -54,72 +50,12 @@ final class ProtocolSessionPromise> extends Default LOG.debug("Promise {} attempting connect for {}ms", lock, timeout); this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); - this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture cf) throws Exception { - synchronized (lock) { - - LOG.debug("Promise {} connection resolved", lock); - - // Triggered when a connection attempt is resolved. - Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf)); - - /* - * The promise we gave out could have been cancelled, - * which cascades to the connect getting cancelled, - * but there is a slight race window, where the connect - * is already resolved, but the listener has not yet - * been notified -- cancellation at that point won't - * stop the notification arriving, so we have to close - * the race here. - */ - if (isCancelled()) { - if (cf.isSuccess()) { - LOG.debug("Closing channel for cancelled promise {}", lock); - cf.channel().close(); - } - return; - } - - if (!cf.isSuccess()) { - LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause()); - - final Future rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause()); - rf.addListener(new FutureListener() { - @Override - public void operationComplete(final Future sf) { - synchronized (lock) { - // Triggered when a connection attempt is to be made. - Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf)); - - /* - * The promise we gave out could have been cancelled, - * which cascades to the reconnect attempt getting - * cancelled, but there is a slight race window, where - * the reconnect attempt is already enqueued, but the - * listener has not yet been notified -- if cancellation - * happens at that point, we need to catch it here. - */ - if (!isCancelled()) { - if (sf.isSuccess()) { - connect(); - } else { - setFailure(sf.cause()); - } - } - } - } - }); - - ProtocolSessionPromise.this.pending = rf; - } else { - LOG.debug("Promise {} connection successful", lock); - } - } - } - }); + final ChannelFuture connectFuture = this.b.connect(this.address); + // Add listener that attempts reconnect by invoking this method again. + connectFuture.addListener(new BootstrapConnectListener(lock)); + this.pending = connectFuture; } catch (final Exception e) { - LOG.info("Failed to connect to {}", e); + LOG.info("Failed to connect to {}", address, e); setFailure(e); } } @@ -140,4 +76,79 @@ final class ProtocolSessionPromise> extends Default this.strategy.reconnectSuccessful(); return super.setSuccess(result); } + + private class BootstrapConnectListener implements ChannelFutureListener { + private final Object lock; + + public BootstrapConnectListener(final Object lock) { + this.lock = lock; + } + + @Override + public void operationComplete(final ChannelFuture cf) throws Exception { + synchronized (lock) { + + LOG.debug("Promise {} connection resolved", lock); + + // Triggered when a connection attempt is resolved. + Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf)); + + /* + * The promise we gave out could have been cancelled, + * which cascades to the connect getting cancelled, + * but there is a slight race window, where the connect + * is already resolved, but the listener has not yet + * been notified -- cancellation at that point won't + * stop the notification arriving, so we have to close + * the race here. + */ + if (isCancelled()) { + if (cf.isSuccess()) { + LOG.debug("Closing channel for cancelled promise {}", lock); + cf.channel().close(); + } + return; + } + + if(cf.isSuccess()) { + LOG.debug("Promise {} connection successful", lock); + return; + } + + LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause()); + + final Future rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause()); + rf.addListener(new ReconnectingStrategyListener()); + ProtocolSessionPromise.this.pending = rf; + } + } + + private class ReconnectingStrategyListener implements FutureListener { + @Override + public void operationComplete(final Future sf) { + synchronized (lock) { + // Triggered when a connection attempt is to be made. + Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf)); + + /* + * The promise we gave out could have been cancelled, + * which cascades to the reconnect attempt getting + * cancelled, but there is a slight race window, where + * the reconnect attempt is already enqueued, but the + * listener has not yet been notified -- if cancellation + * happens at that point, we need to catch it here. + */ + if (!isCancelled()) { + if (sf.isSuccess()) { + connect(); + } else { + setFailure(sf.cause()); + } + } + } + } + } + + } + } diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index 1fa6a81753..fe1012f443 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -7,176 +7,100 @@ */ package org.opendaylight.protocol.framework; -import io.netty.channel.ChannelFuture; +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; - -import java.io.Closeable; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class ReconnectPromise, L extends SessionListener> extends DefaultPromise { + private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class); + private final AbstractDispatcher dispatcher; private final InetSocketAddress address; private final ReconnectStrategyFactory strategyFactory; - private final ReconnectStrategy strategy; - private final PipelineInitializer initializer; + private final Bootstrap b; + private final AbstractDispatcher.PipelineInitializer initializer; private Future pending; - private final AtomicBoolean negotiationFinished = new AtomicBoolean(false); - public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher dispatcher, final InetSocketAddress address, - final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, - final PipelineInitializer initializer) { + final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer initializer) { super(executor); + this.b = b; + this.initializer = Preconditions.checkNotNull(initializer); this.dispatcher = Preconditions.checkNotNull(dispatcher); this.address = Preconditions.checkNotNull(address); this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory); - this.strategy = Preconditions.checkNotNull(reestablishStrategy); - this.initializer = Preconditions.checkNotNull(initializer); } - // FIXME: BUG-190: refactor - synchronized void connect() { - negotiationFinished.set(false); - final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); - final ReconnectStrategy rs = new ReconnectStrategy() { - @Override - public Future scheduleReconnect(final Throwable cause) { - return cs.scheduleReconnect(cause); - } - @Override - public void reconnectSuccessful() { - cs.reconnectSuccessful(); - } - - @Override - public int getConnectTimeout() throws Exception { - final int cst = cs.getConnectTimeout(); - final int rst = ReconnectPromise.this.strategy.getConnectTimeout(); - - if (cst == 0) { - return rst; - } - if (rst == 0) { - return cst; - } - return Math.min(cst, rst); - } - }; - - final Future cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer() { + // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts + pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer() { @Override public void initializeChannel(final SocketChannel channel, final Promise promise) { - addChannelClosedListener(channel.closeFuture()); initializer.initializeChannel(channel, promise); + + // add closed channel handler + channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this)); } }); + } - final Object lock = this; - this.pending = cf; + /** + * + * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed + */ + private boolean isInitialConnectFinished() { + Preconditions.checkNotNull(pending); + return pending.isDone() && pending.isSuccess(); + } - cf.addListener(new FutureListener() { + @Override + public synchronized boolean cancel(final boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + Preconditions.checkNotNull(pending); + this.pending.cancel(mayInterruptIfRunning); + return true; + } - @Override - public void operationComplete(final Future future) { - synchronized (lock) { - if (!future.isSuccess()) { - final Future rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause()); - - if(rf == null) { - // This should reflect: no more reconnecting strategies, enough - // Currently all reconnect strategies fail with exception, should return null - return; - } - - ReconnectPromise.this.pending = rf; - - rf.addListener(new FutureListener() { - @Override - public void operationComplete(final Future sf) { - synchronized (lock) { - /* - * The promise we gave out could have been cancelled, - * which cascades to the reconnect attempt getting - * cancelled, but there is a slight race window, where - * the reconnect attempt is already enqueued, but the - * listener has not yet been notified -- if cancellation - * happens at that point, we need to catch it here. - */ - if (!isCancelled()) { - if (sf.isSuccess()) { - connect(); - } else { - setFailure(sf.cause()); - } - } - } - } - }); - } else { - /* - * FIXME: BUG-190: we have a slight race window with cancellation - * here. Analyze and define its semantics. - */ - ReconnectPromise.this.strategy.reconnectSuccessful(); - negotiationFinished.set(true); - } - } - } - }); + return false; } - private final ClosedChannelListener closedChannelListener = new ClosedChannelListener(); - - class ClosedChannelListener implements Closeable, FutureListener { + /** + * Channel handler that responds to channelInactive event and reconnects the session. + * Only if the initial connection was successfully established and promise was not canceled. + */ + private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter { + private final ReconnectPromise promise; - private final AtomicBoolean stop = new AtomicBoolean(false); + public ClosedChannelHandler(final ReconnectPromise promise) { + this.promise = promise; + } @Override - public void operationComplete(final Future future) throws Exception { - if (stop.get()) { + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + if (promise.isCancelled()) { return; } - // Start reconnecting crashed session after negotiation was successful - if (!negotiationFinished.get()) { + // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen. + // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect + if (promise.isInitialConnectFinished() == false) { return; } - connect(); - } - - @Override - public void close() { - this.stop.set(true); + LOG.debug("Reconnecting after connection to {} was dropped", promise.address); + promise.connect(); } } - private void addChannelClosedListener(final ChannelFuture channelFuture) { - channelFuture.addListener(closedChannelListener); - } - - @Override - public synchronized boolean cancel(final boolean mayInterruptIfRunning) { - closedChannelListener.close(); - - if (super.cancel(mayInterruptIfRunning)) { - this.pending.cancel(mayInterruptIfRunning); - return true; - } - - return false; - } } diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java index 3c429fc774..a756a0da7e 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java @@ -10,7 +10,7 @@ package org.opendaylight.protocol.framework; import java.util.EventListener; /** - * Listener that receives session state informations. This interface should be + * Listener that receives session state information. This interface should be * implemented by a protocol specific abstract class, that is extended by * a final class that implements the methods. */ diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java index bead1ee49e..63026e384c 100644 --- a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java +++ b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java @@ -9,6 +9,14 @@ package org.opendaylight.protocol.framework; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; @@ -16,50 +24,139 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; - +import io.netty.util.concurrent.SucceededFuture; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class ServerTest { SimpleDispatcher clientDispatcher, dispatcher; - final SimpleSessionListener pce = new SimpleSessionListener(); - SimpleSession session = null; ChannelFuture server = null; InetSocketAddress serverAddress; private NioEventLoopGroup eventLoopGroup; - + // Dedicated loop group for server, needed for testing reconnection client + // With dedicated server group we can simulate session drop by shutting only the server group down + private NioEventLoopGroup serverLoopGroup; @Before public void setUp() { final int port = 10000 + (int)(10000 * Math.random()); serverAddress = new InetSocketAddress("127.0.0.1", port); eventLoopGroup = new NioEventLoopGroup(); + serverLoopGroup = new NioEventLoopGroup(); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + if(server != null) { + this.server.channel().close(); + } + this.eventLoopGroup.shutdownGracefully().get(); + this.serverLoopGroup.shutdownGracefully().get(); + try { + Thread.sleep(500); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } } @Test - public void testConnectionEstablished() throws Exception { + public void testConnectionRefused() throws Exception { + this.clientDispatcher = getClientDispatcher(); + + final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy(); + + this.clientDispatcher.createClient(this.serverAddress, + mockReconnectStrategy, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class)); + } + + @Test + public void testConnectionReestablishInitial() throws Exception { + this.clientDispatcher = getClientDispatcher(); + + final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy(); + + this.clientDispatcher.createClient(this.serverAddress, + mockReconnectStrategy, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class)); + + final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + this.dispatcher = getServerDispatcher(p); + + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + this.server.get(); + + assertEquals(true, p.get(3, TimeUnit.SECONDS)); + } + + @Test + public void testConnectionDrop() throws Exception { final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); - this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { + this.dispatcher = getServerDispatcher(p); + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - p.setSuccess(true); - return new SimpleSessionNegotiator(promise, channel); + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); } - }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + }); + + this.server.get(); + + this.clientDispatcher = getClientDispatcher(); + + final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy(); + this.session = this.clientDispatcher.createClient(this.serverAddress, + reconnectStrategy, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }).get(6, TimeUnit.SECONDS); + + assertEquals(true, p.get(3, TimeUnit.SECONDS)); + + shutdownServer(); + + // No reconnect should be scheduled after server drops connection with not-reconnecting client + verify(reconnectStrategy, times(0)).scheduleReconnect(any(Throwable.class)); + } + + @Test + public void testConnectionReestablishAfterDrop() throws Exception { + final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + + this.dispatcher = getServerDispatcher(p); this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { @Override @@ -70,13 +167,42 @@ public class ServerTest { this.server.get(); - this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { + this.clientDispatcher = getClientDispatcher(); + + final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class); + final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy(); + doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy(); + + this.clientDispatcher.createReconnectingClient(this.serverAddress, + reconnectStrategyFactory, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + assertEquals(true, p.get(3, TimeUnit.SECONDS)); + shutdownServer(); + + verify(reconnectStrategyFactory, timeout(20000).atLeast(2)).createReconnectStrategy(); + } + + @Test + public void testConnectionEstablished() throws Exception { + final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + + this.dispatcher = getServerDispatcher(p); + + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - return new SimpleSessionNegotiator(promise, channel); + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); } - }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + }); + + this.server.get(); + + this.clientDispatcher = getClientDispatcher(); this.session = this.clientDispatcher.createClient(this.serverAddress, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { @@ -93,15 +219,7 @@ public class ServerTest { public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException { final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); - this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { - - @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - p.setSuccess(true); - return new SimpleSessionNegotiator(promise, channel); - } - }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + this.dispatcher = getServerDispatcher(p); this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { @Override @@ -112,13 +230,7 @@ public class ServerTest { this.server.get(); - this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { - @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - return new SimpleSessionNegotiator(promise, channel); - } - }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + this.clientDispatcher = getClientDispatcher(); this.session = this.clientDispatcher.createClient(this.serverAddress, new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { @@ -138,14 +250,89 @@ public class ServerTest { assertFalse(session.isSuccess()); } - @After - public void tearDown() throws IOException, InterruptedException { - this.server.channel().close(); - this.eventLoopGroup.shutdownGracefully(); - try { - Thread.sleep(500); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } + @Test + public void testNegotiationFailedNoReconnect() throws Exception { + final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + + this.dispatcher = getServerDispatcher(p); + + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + this.server.get(); + + this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + + return new SimpleSessionNegotiator(promise, channel) { + @Override + protected void startNegotiation() throws Exception { + negotiationFailed(new IllegalStateException("Negotiation failed")); + } + }; + } + }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + + final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class); + final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy(); + doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy(); + + this.clientDispatcher.createReconnectingClient(this.serverAddress, + reconnectStrategyFactory, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + + // Only one strategy should be created for initial connect, no more = no reconnects + verify(reconnectStrategyFactory, times(1)).createReconnectStrategy(); } + + private SimpleDispatcher getClientDispatcher() { + return new SimpleDispatcher(new SessionNegotiatorFactory() { + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + return new SimpleSessionNegotiator(promise, channel); + } + }, new DefaultPromise(GlobalEventExecutor.INSTANCE), eventLoopGroup); + } + + private ReconnectStrategy getMockedReconnectStrategy() throws Exception { + final ReconnectStrategy mockReconnectStrategy = mock(ReconnectStrategy.class); + final Future future = new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null); + doReturn(future).when(mockReconnectStrategy).scheduleReconnect(any(Throwable.class)); + doReturn(5000).when(mockReconnectStrategy).getConnectTimeout(); + doNothing().when(mockReconnectStrategy).reconnectSuccessful(); + return mockReconnectStrategy; + } + + + private void shutdownServer() throws InterruptedException, ExecutionException { + // Shutdown server + server.channel().close().get(); + // Closing server channel does not close established connections, eventLoop has to be closed as well to simulate dropped session + serverLoopGroup.shutdownGracefully().get(); + } + + private SimpleDispatcher getServerDispatcher(final Promise p) { + return new SimpleDispatcher(new SessionNegotiatorFactory() { + + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + p.setSuccess(true); + return new SimpleSessionNegotiator(promise, channel); + } + }, null, serverLoopGroup); + } + } diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java index 12aac9ecc5..d83738520c 100644 --- a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java @@ -54,6 +54,10 @@ public class SimpleDispatcher extends AbstractDispatcher createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory strategy, final SessionListenerFactory listenerFactory) { + return super.createReconnectingClient(address, strategy, new SimplePipelineInitializer(listenerFactory)); + } + public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory listenerFactory) { return super.createServer(address, new SimplePipelineInitializer(listenerFactory)); } -- 2.36.6