BUG-7673: Improve synchonization under BGP/PCEP Session 83/51083/3
authorClaudio D. Gasparini <cgaspari@cisco.com>
Thu, 26 Jan 2017 17:46:31 +0000 (18:46 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 27 Jan 2017 13:40:04 +0000 (13:40 +0000)
- Improve synchonization under BGP/PCEP Session
- Code clean up

Change-Id: I93851323232fa9df0c23e1e23751320cdf578031
Signed-off-by: Claudio D. Gasparini <cgaspari@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java
pcep/impl/src/test/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImplTest.java
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCsBuilder.java
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/protocol/PCCDispatcherImpl.java
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/protocol/PCCReconnectPromise.java
pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PCCDispatcherImplTest.java

index 01718ebf9dcce688df9737c2fb4672234dee52e2..0d180f5ffa64f6234b7d1177687114df327ecc00 100644 (file)
@@ -28,7 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
+public final class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
     private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
     private static final int CONNECT_TIMEOUT = 5000;
 
@@ -36,6 +36,7 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     private final int retryTimer;
     private final Bootstrap bootstrap;
     private final BGPPeerRegistry peerRegistry;
+    @GuardedBy("this")
     private final AutoCloseable listenerRegistration;
     @GuardedBy("this")
     private ChannelFuture pending;
@@ -45,15 +46,16 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     private boolean connectSkipped;
 
 
-    public BGPProtocolSessionPromise(InetSocketAddress remoteAddress, int retryTimer, Bootstrap bootstrap, BGPPeerRegistry peerRegistry) {
+    public BGPProtocolSessionPromise(@Nonnull final InetSocketAddress remoteAddress, final int retryTimer,
+        @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry) {
         super(GlobalEventExecutor.INSTANCE);
         this.address = Preconditions.checkNotNull(remoteAddress);
         this.retryTimer = retryTimer;
         this.bootstrap = Preconditions.checkNotNull(bootstrap);
         this.peerRegistry = Preconditions.checkNotNull(peerRegistry);
-        this.listenerRegistration = this.peerRegistry
-                .registerPeerSessionListener(new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
-                        StrictBGPPeerRegistry.getIpAddress(this.address)));
+        this.listenerRegistration = this.peerRegistry.registerPeerSessionListener(
+            new BGPProtocolSessionPromise.PeerRegistrySessionListenerImpl(this,
+                StrictBGPPeerRegistry.getIpAddress(this.address)));
     }
 
     public synchronized void connect() {
@@ -77,7 +79,7 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
             final ChannelFuture connectFuture = this.bootstrap.connect();
             connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
             this.pending = connectFuture;
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.info("Failed to connect to {}", this.address, e);
             this.setFailure(e);
         }
@@ -92,9 +94,8 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
 
         final BGPProtocolSessionPromise lock = this;
         final EventLoop loop = this.pending.channel().eventLoop();
-        loop.schedule(new Runnable() {
-            @Override
-            public void run() {
+        loop.schedule(() -> {
+            synchronized (BGPProtocolSessionPromise.this) {
                 if (BGPProtocolSessionPromise.this.peerSessionPresent) {
                     LOG.debug("Connection to {} already exists", BGPProtocolSessionPromise.this.address);
                     BGPProtocolSessionPromise.this.connectSkipped = true;
@@ -123,7 +124,7 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         }
     }
 
-    private void closePeerSessionListener() {
+    private synchronized void closePeerSessionListener() {
         try {
             this.listenerRegistration.close();
         } catch (final Exception e) {
@@ -138,9 +139,10 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     }
 
     private class BootstrapConnectListener implements ChannelFutureListener {
+        @GuardedBy("this")
         private final Object lock;
 
-        public BootstrapConnectListener(final Object lock) {
+        BootstrapConnectListener(final Object lock) {
             this.lock = lock;
         }
 
@@ -165,6 +167,7 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
     }
 
     private class PeerRegistrySessionListenerImpl implements PeerRegistrySessionListener {
+        @GuardedBy("this")
         private final Object lock;
         private final IpAddress peerAddress;
 
index e69f97bf4c9fb42fd40b17fa19465569a91c1840..feadee1076dd66be664483788dce1d0543bea082 100644 (file)
@@ -16,10 +16,8 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
@@ -36,10 +34,9 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private final ChannelPipelineInitializer initializer;
     private BGPProtocolSessionPromise<S> pending;
 
-    public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
-                               final int retryTimer, final Bootstrap bootstrap,
-                               final BGPPeerRegistry peerRegistry,
-                               final ChannelPipelineInitializer initializer) {
+    public BGPReconnectPromise(@Nonnull final EventExecutor executor, @Nonnull final InetSocketAddress address,
+        final int retryTimer, @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry,
+        @Nonnull final ChannelPipelineInitializer initializer) {
         super(executor);
         this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
@@ -54,29 +51,23 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         }
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
-            @Override
-            public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
-                // add closed channel handler
-                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
-                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
-                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
-                channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
-            }
+        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, (channel, promise) -> {
+            this.initializer.initializeChannel(channel, promise);
+            // add closed channel handler
+            // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+            // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+            // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+            channel.pipeline().addLast(new ClosedChannelHandler(this));
         });
 
-        this.pending.addListener(new GenericFutureListener<Future<Object>>() {
-            @Override
-            public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
-                    BGPReconnectPromise.this.setFailure(future.cause());
-                }
+        this.pending.addListener(future -> {
+            if (!future.isSuccess() && !this.isDone()) {
+                this.setFailure(future.cause());
             }
         });
     }
 
-    public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
+    private BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
             final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
         final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@@ -95,12 +86,12 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     /**
      * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
      */
-    private boolean isInitialConnectFinished() {
+    private  synchronized boolean isInitialConnectFinished() {
         Preconditions.checkNotNull(this.pending);
         return this.pending.isDone() && this.pending.isSuccess();
     }
 
-    private void reconnect() {
+    private synchronized void reconnect() {
         Preconditions.checkNotNull(this.pending);
         this.pending.reconnect();
     }
@@ -122,7 +113,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
         private final BGPReconnectPromise promise;
 
-        public ClosedChannelHandler(final BGPReconnectPromise promise) {
+        ClosedChannelHandler(final BGPReconnectPromise promise) {
             this.promise = promise;
         }
 
index 06105f8239771de3097d0879216c18f62bbab765..91d734a8b4ae79dd42b7da6a95858230977b62cd 100644 (file)
@@ -28,6 +28,8 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.concepts.KeyMapping;
 import org.opendaylight.protocol.pcep.PCEPDispatcher;
 import org.opendaylight.protocol.pcep.PCEPPeerProposal;
@@ -45,11 +47,10 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     private static final Integer SOCKET_BACKLOG_SIZE = 128;
     private final PCEPSessionNegotiatorFactory snf;
     private final PCEPHandlerFactory hf;
-
-
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final EventExecutor executor;
+    @GuardedBy("this")
     private Optional<KeyMapping> keys;
 
     /**
@@ -60,9 +61,9 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
      * @param bossGroup accepts an incoming connection
      * @param workerGroup handles the traffic of accepted connection
      */
-    public PCEPDispatcherImpl(final MessageRegistry registry,
-            final PCEPSessionNegotiatorFactory negotiatorFactory,
-            final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+    public PCEPDispatcherImpl(@Nonnull final MessageRegistry registry,
+        @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
+        @Nonnull final EventLoopGroup bossGroup, @Nonnull  final EventLoopGroup workerGroup) {
         this.snf = Preconditions.checkNotNull(negotiatorFactory);
         this.hf = new PCEPHandlerFactory(registry);
         if (Epoll.isAvailable()) {
@@ -76,20 +77,20 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     }
 
     @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address,
-            final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
-        return createServer(address, Optional.<KeyMapping>absent(), listenerFactory, peerProposal);
+    public final synchronized ChannelFuture createServer(final InetSocketAddress address,
+        final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
+        return createServer(address, Optional.absent(), listenerFactory, peerProposal);
     }
 
     @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
-            final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
+    public final synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
+        final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
         this.keys = keys;
 
         final ChannelPipelineInitializer initializer = (ch, promise) -> {
-            ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getDecoders());
-            ch.pipeline().addLast("negotiator", PCEPDispatcherImpl.this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
-            ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getEncoders());
+            ch.pipeline().addLast(this.hf.getDecoders());
+            ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
+            ch.pipeline().addLast(this.hf.getEncoders());
         };
 
         final ServerBootstrap b = createServerBootstrap(initializer);
@@ -100,7 +101,7 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
         return f;
     }
 
-    protected ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
+    synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
         final ServerBootstrap b = new ServerBootstrap();
         b.childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
@@ -137,7 +138,7 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     }
 
     @Override
-    public void close() {
+    public final void close() {
         if (Epoll.isAvailable()) {
             this.workerGroup.shutdownGracefully().awaitUninterruptibly();
             this.bossGroup.shutdownGracefully().awaitUninterruptibly();
@@ -149,7 +150,7 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     }
 
     @Override
-    public PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
+    public final PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
         return this.snf;
     }
 }
index 69c40aa8a96c21a79274d1d01d3f8685b2ceff8e..82d74c86e45fac736eaf3cee5ab500d70b1b786d 100755 (executable)
@@ -40,7 +40,6 @@ import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl.ChannelPipelineInitializer;
 import org.opendaylight.protocol.pcep.spi.MessageRegistry;
 import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
 import org.opendaylight.protocol.util.InetSocketAddressUtil;
@@ -54,7 +53,8 @@ public class PCEPDispatcherImplTest {
     private PCEPDispatcherImpl dispatcher;
     private PCEPDispatcherImpl disp2Spy;
 
-    @Mock private Channel mockChannel;
+    @Mock
+    private Channel mockChannel;
 
     private PCCMock pccMock;
 
@@ -62,7 +62,8 @@ public class PCEPDispatcherImplTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         final List<PCEPCapability> capList = new ArrayList<>();
-        final PCEPSessionProposalFactory sessionProposal = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE, capList);
+        final PCEPSessionProposalFactory sessionProposal = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE,
+            capList);
         final EventLoopGroup eventLoopGroup;
         if (Epoll.isAvailable()) {
             eventLoopGroup = new EpollEventLoopGroup();
@@ -70,16 +71,16 @@ public class PCEPDispatcherImplTest {
             eventLoopGroup = new NioEventLoopGroup();
         }
         final MessageRegistry msgReg = ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
-                .getMessageHandlerRegistry();
+            .getMessageHandlerRegistry();
         this.dispatcher = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0),
-                eventLoopGroup, eventLoopGroup);
+            eventLoopGroup, eventLoopGroup);
 
         Mockito.doReturn("mockChannel").when(this.mockChannel).toString();
         final PCEPDispatcherImpl dispatcher2 = new PCEPDispatcherImpl(msgReg, new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0), eventLoopGroup, eventLoopGroup);
         this.disp2Spy = Mockito.spy(dispatcher2);
 
         this.pccMock = new PCCMock(new DefaultPCEPSessionNegotiatorFactory(sessionProposal, 0),
-                new PCEPHandlerFactory(msgReg));
+            new PCEPHandlerFactory(msgReg));
     }
 
     @Test
@@ -89,14 +90,12 @@ public class PCEPDispatcherImplTest {
         final InetSocketAddress clientAddr1 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
         final InetSocketAddress clientAddr2 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
         final ChannelFuture futureChannel = this.dispatcher.createServer(serverAddr,
-                () -> new SimpleSessionListener(), null);
+            SimpleSessionListener::new, null);
         final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr1,
-                RETRY_TIMER, CONNECT_TIMEOUT,
-                () -> new SimpleSessionListener()).get();
+            RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
 
         final PCEPSessionImpl session2 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr2,
-                RETRY_TIMER, CONNECT_TIMEOUT,
-                () -> new SimpleSessionListener()).get();
+            RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
 
         Assert.assertTrue(futureChannel.channel().isActive());
         Assert.assertEquals(clientAddr1.getAddress().getHostAddress(), session1.getPeerPref().getIpAddress());
@@ -117,16 +116,13 @@ public class PCEPDispatcherImplTest {
         final int port = InetSocketAddressUtil.getRandomPort();
         final InetSocketAddress serverAddr = new InetSocketAddress("0.0.0.0", port);
         final InetSocketAddress clientAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
-        this.dispatcher.createServer(serverAddr,
-                () -> new SimpleSessionListener(), null);
+        this.dispatcher.createServer(serverAddr, SimpleSessionListener::new, null);
         final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
-                RETRY_TIMER, CONNECT_TIMEOUT,
-                () -> new SimpleSessionListener()).get();
+            RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
 
         try {
-            this.pccMock.createClient(clientAddr,
-                    RETRY_TIMER, CONNECT_TIMEOUT,
-                    () -> new SimpleSessionListener()).get();
+            this.pccMock.createClient(clientAddr, RETRY_TIMER, CONNECT_TIMEOUT,
+                SimpleSessionListener::new).get();
             Assert.fail();
         } catch (final ExecutionException e) {
             Assert.assertTrue(e.getMessage().contains("A conflicting session for address"));
@@ -140,10 +136,9 @@ public class PCEPDispatcherImplTest {
         final int port = InetSocketAddressUtil.getRandomPort();
         final InetSocketAddress clientAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
         this.dispatcher.createServer(new InetSocketAddress("0.0.0.0", port),
-                () -> new SimpleSessionListener(), null);
+            SimpleSessionListener::new, null);
         final PCEPSessionImpl session1 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
-                RETRY_TIMER, CONNECT_TIMEOUT,
-                () -> new SimpleSessionListener()).get();
+            RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
 
         Assert.assertEquals(clientAddr.getAddress(), session1.getRemoteAddress());
         Assert.assertEquals(DEAD_TIMER, session1.getDeadTimerValue().shortValue());
@@ -151,8 +146,7 @@ public class PCEPDispatcherImplTest {
         session1.close();
 
         final PCEPSessionImpl session2 = (PCEPSessionImpl) this.pccMock.createClient(clientAddr,
-                RETRY_TIMER, CONNECT_TIMEOUT,
-                () -> new SimpleSessionListener()).get();
+            RETRY_TIMER, CONNECT_TIMEOUT, SimpleSessionListener::new).get();
 
         Assert.assertEquals(clientAddr.getAddress(), session1.getRemoteAddress());
         Assert.assertEquals(DEAD_TIMER, session2.getDeadTimerValue().shortValue());
@@ -166,11 +160,11 @@ public class PCEPDispatcherImplTest {
         final int port = InetSocketAddressUtil.getRandomPort();
         final InetSocketAddress clientAddr1 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
         final InetSocketAddress clientAddr2 = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
-        final KeyMapping keys = KeyMapping.getKeyMapping(clientAddr1.getAddress(), new String("CLIENT1_ADDRESS"));
-        keys.put(clientAddr2.getAddress(), new String("CLIENT2_ADDRESS").getBytes() );
+        final KeyMapping keys = KeyMapping.getKeyMapping(clientAddr1.getAddress(), "CLIENT1_ADDRESS");
+        keys.put(clientAddr2.getAddress(), "CLIENT2_ADDRESS".getBytes());
 
         final ChannelFuture futureChannel = this.disp2Spy.createServer(new InetSocketAddress("0.0.0.0", port),
-                () -> new SimpleSessionListener(), null);
+            SimpleSessionListener::new, null);
         Mockito.verify(this.disp2Spy).createServerBootstrap(Mockito.any(PCEPDispatcherImpl.ChannelPipelineInitializer.class));
     }
 
@@ -195,19 +189,21 @@ public class PCEPDispatcherImplTest {
         }
 
         public Future<PCEPSession> createClient(final InetSocketAddress address, final int retryTimer,
-                final int connectTimeout, final PCEPSessionListenerFactory listenerFactory) {
-            return createClient(address, retryTimer, connectTimeout, (ChannelPipelineInitializer) (ch, promise) -> {
-                ch.pipeline().addLast(PCCMock.this.factory.getDecoders());
-                ch.pipeline().addLast("negotiator", PCCMock.this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, null));
-                ch.pipeline().addLast(PCCMock.this.factory.getEncoders());
+            final int connectTimeout, final PCEPSessionListenerFactory listenerFactory) {
+            return createClient(address, retryTimer, connectTimeout, (ch, promise) -> {
+                ch.pipeline().addLast(this.factory.getDecoders());
+                ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch,
+                    promise, null));
+                ch.pipeline().addLast(this.factory.getEncoders());
             });
         }
 
         Future<PCEPSession> createClient(final InetSocketAddress address, final int retryTimer, final int connectTimeout,
-                final PCEPDispatcherImpl.ChannelPipelineInitializer initializer) {
+            final PCEPDispatcherImpl.ChannelPipelineInitializer initializer) {
             final Bootstrap b = new Bootstrap();
-            final PCEPProtocolSessionPromise p = new PCEPProtocolSessionPromise(this.executor, address, retryTimer, connectTimeout, b);
-            (b.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(true))).handler(new ChannelInitializer<SocketChannel>() {
+            final PCEPProtocolSessionPromise p = new PCEPProtocolSessionPromise(this.executor, address, retryTimer,
+                connectTimeout, b);
+            (b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)).handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(final SocketChannel ch) {
                     initializer.initializeChannel(ch, p);
index 6710f04c54e897160bdc220334a0d24321e85ff5..6482849503f48503a41c292f23846895b5e2f1db 100644 (file)
@@ -75,7 +75,7 @@ final class PCCsBuilder {
         InetAddress currentAddress = this.localAddress.getAddress();
         this.pccDispatcher = new PCCDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
                 .getMessageHandlerRegistry());
-        if(timerHandler.isPresent()) {
+        if (timerHandler.isPresent()) {
             timerHandler.get().setPCCDispatcher(this.pccDispatcher);
         }
         for (int i = 0; i < this.pccCount; i++) {
index 64d6aed2329b7e426d413322d83260398ba13007..6d74dc714d52718e0d83794adf33d0511deb4152 100755 (executable)
@@ -107,7 +107,7 @@ public final class PCCDispatcherImpl implements PCCDispatcher, AutoCloseable {
         return promise;
     }
 
-    private void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
+    private static void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
         if (Epoll.isAvailable()) {
             bootstrap.channel(EpollSocketChannel.class);
             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
index 48bb88832584a81f0b136555cbd2df64ad2e6719..9399b3f2ee1da592dd5f12c43413c5de35125c54 100755 (executable)
@@ -50,7 +50,7 @@ final class PCCReconnectPromise extends DefaultPromise<PCEPSession> {
             this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
             this.b.remoteAddress(this.address);
             final ChannelFuture cf = this.b.connect();
-            cf.addListener(new BootstrapConnectListener(PCCReconnectPromise.this));
+            cf.addListener(new BootstrapConnectListener(this));
             this.pending = cf;
         } catch (final Exception e) {
             LOG.info("Failed to connect to {}", this.address, e);
@@ -114,7 +114,7 @@ final class PCCReconnectPromise extends DefaultPromise<PCEPSession> {
                         synchronized (PCCReconnectPromise.this) {
                             PCCReconnectPromise.LOG.debug("Attempting to connect to {}", PCCReconnectPromise.this.address);
                             final Future reconnectFuture = PCCReconnectPromise.this.b.connect();
-                            reconnectFuture.addListener(BootstrapConnectListener.this);
+                            reconnectFuture.addListener(this);
                             PCCReconnectPromise.this.pending = reconnectFuture;
                         }
                     }, PCCReconnectPromise.this.retryTimer, TimeUnit.SECONDS);
index 98c765d47425adf08c31e1628d3fcdd8f345f79a..1963453619e8e7e3b08ef139de6aba379194dbc1 100644 (file)
@@ -98,5 +98,6 @@ public class PCCDispatcherImplTest {
         final TestingSessionListener sl2 = checkSessionListenerNotNull(slf2, this.clientAddress.getAddress().getHostAddress());
         Assert.assertNotNull(sl2.getSession());
         Assert.assertTrue(sl2.isUp());
+        channel2.close();
     }
 }