BUG-7673: Improve synchonization under BGP/PCEP Session
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / protocol / BGPReconnectPromise.java
index e69f97bf4c9fb42fd40b17fa19465569a91c1840..feadee1076dd66be664483788dce1d0543bea082 100644 (file)
@@ -16,10 +16,8 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import javax.annotation.Nonnull;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry;
 import org.opendaylight.protocol.bgp.rib.impl.spi.ChannelPipelineInitializer;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
@@ -36,10 +34,9 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private final ChannelPipelineInitializer initializer;
     private BGPProtocolSessionPromise<S> pending;
 
-    public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
-                               final int retryTimer, final Bootstrap bootstrap,
-                               final BGPPeerRegistry peerRegistry,
-                               final ChannelPipelineInitializer initializer) {
+    public BGPReconnectPromise(@Nonnull final EventExecutor executor, @Nonnull final InetSocketAddress address,
+        final int retryTimer, @Nonnull final Bootstrap bootstrap, @Nonnull final BGPPeerRegistry peerRegistry,
+        @Nonnull final ChannelPipelineInitializer initializer) {
         super(executor);
         this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
@@ -54,29 +51,23 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         }
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, new ChannelPipelineInitializer<S>() {
-            @Override
-            public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
-                // add closed channel handler
-                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
-                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
-                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
-                channel.pipeline().addLast(new ClosedChannelHandler(BGPReconnectPromise.this));
-            }
+        this.pending = connectSessionPromise(this.address, this.retryTimer, this.bootstrap, this.peerRegistry, (channel, promise) -> {
+            this.initializer.initializeChannel(channel, promise);
+            // add closed channel handler
+            // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+            // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+            // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+            channel.pipeline().addLast(new ClosedChannelHandler(this));
         });
 
-        this.pending.addListener(new GenericFutureListener<Future<Object>>() {
-            @Override
-            public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess() && !BGPReconnectPromise.this.isDone()) {
-                    BGPReconnectPromise.this.setFailure(future.cause());
-                }
+        this.pending.addListener(future -> {
+            if (!future.isSuccess() && !this.isDone()) {
+                this.setFailure(future.cause());
             }
         });
     }
 
-    public BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
+    private BGPProtocolSessionPromise<S> connectSessionPromise(final InetSocketAddress address, final int retryTimer, final Bootstrap bootstrap,
             final BGPPeerRegistry peerRegistry, final ChannelPipelineInitializer initializer) {
         final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(address, retryTimer, bootstrap, peerRegistry);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
@@ -95,12 +86,12 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     /**
      * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
      */
-    private boolean isInitialConnectFinished() {
+    private  synchronized boolean isInitialConnectFinished() {
         Preconditions.checkNotNull(this.pending);
         return this.pending.isDone() && this.pending.isSuccess();
     }
 
-    private void reconnect() {
+    private synchronized void reconnect() {
         Preconditions.checkNotNull(this.pending);
         this.pending.reconnect();
     }
@@ -122,7 +113,7 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
     private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
         private final BGPReconnectPromise promise;
 
-        public ClosedChannelHandler(final BGPReconnectPromise promise) {
+        ClosedChannelHandler(final BGPReconnectPromise promise) {
             this.promise = promise;
         }