Refactor BGP session 28/28428/2
authorIveta Halanova <ihalanov@cisco.com>
Thu, 15 Oct 2015 09:05:54 +0000 (11:05 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 19 Oct 2015 15:33:17 +0000 (15:33 +0000)
refactored attribute, parameter, method names

Change-Id: I9840f65d7d52637cdf4e063642281c26edb22e50
Signed-off-by: Iveta Halanova <ihalanov@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPProtocolSessionPromise.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/protocol/BGPReconnectPromise.java

index fcc1417af2495c7bb13fd1b4ef4ecac2f8a55ebc..8b0b7c756d4ebe29a5bbeade56cffca1bb509c62 100644 (file)
@@ -48,9 +48,9 @@ import org.slf4j.LoggerFactory;
 public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(BGPDispatcherImpl.class);
     private static final int SOCKET_BACKLOG_SIZE = 128;
-    private final MD5ServerChannelFactory<?> scf;
-    private final MD5ChannelFactory<?> cf;
-    private final BGPHandlerFactory hf;
+    private final MD5ServerChannelFactory<?> serverChannelFactory;
+    private final MD5ChannelFactory<?> channelFactory;
+    private final BGPHandlerFactory handlerFactory;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final EventExecutor executor;
@@ -64,26 +64,26 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         this.bossGroup = Preconditions.checkNotNull(bossGroup);
         this.workerGroup = Preconditions.checkNotNull(workerGroup);
         this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
-        this.hf = new BGPHandlerFactory(messageRegistry);
-        this.cf = cf;
-        this.scf = scf;
+        this.handlerFactory = new BGPHandlerFactory(messageRegistry);
+        this.channelFactory = cf;
+        this.serverChannelFactory = scf;
         this.keys = null;
     }
 
     @Override
     public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress address, final BGPPeerRegistry listener, final ReconnectStrategy strategy) {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(listener);
-        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf);
-
-        final Bootstrap b = new Bootstrap();
-        final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, b);
-        b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
-        b.handler(BGPChannel.createChannelInitializer(initializer, p));
-        this.customizeBootstrap(b);
-        setWorkerGroup(b);
-        p.connect();
+        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
+
+        final Bootstrap bootstrap = new Bootstrap();
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+        bootstrap.handler(BGPChannel.createChannelInitializer(initializer, sessionPromise));
+        this.customizeBootstrap(bootstrap);
+        setWorkerGroup(bootstrap);
+        sessionPromise.connect();
         LOG.debug("Client created.");
-        return p;
+        return sessionPromise;
     }
 
     @Override
@@ -101,78 +101,78 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         final BGPClientSessionNegotiatorFactory snf = new BGPClientSessionNegotiatorFactory(peerRegistry);
         this.keys = keys;
 
-        final Bootstrap b = new Bootstrap();
-        final BGPReconnectPromise p = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
-            connectStrategyFactory, b, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf));
-        b.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
-        this.customizeBootstrap(b);
-        setWorkerGroup(b);
-        p.connect();
+        final Bootstrap bootstrap = new Bootstrap();
+        final BGPReconnectPromise reconnectPromise = new BGPReconnectPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE, address,
+            connectStrategyFactory, bootstrap, BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf));
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+        this.customizeBootstrap(bootstrap);
+        setWorkerGroup(bootstrap);
+        reconnectPromise.connect();
 
         this.keys = null;
 
-        return p;
+        return reconnectPromise;
     }
 
     @Override
     public ChannelFuture createServer(final BGPPeerRegistry registry, final InetSocketAddress address) {
         final BGPServerSessionNegotiatorFactory snf = new BGPServerSessionNegotiatorFactory(registry);
-        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.hf, snf);
-        final ServerBootstrap b = new ServerBootstrap();
-        b.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
-        b.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
-        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        this.customizeBootstrap(b);
-
-        final ChannelFuture f = b.bind(address);
-        LOG.debug("Initiated server {} at {}.", f, address);
-        return f;
+        final ChannelPipelineInitializer initializer = BGPChannel.createChannelPipelineInitializer(BGPDispatcherImpl.this.handlerFactory, snf);
+        final ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap.childHandler(BGPChannel.createChannelInitializer(initializer, new DefaultPromise(BGPDispatcherImpl.this.executor)));
+        serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(SOCKET_BACKLOG_SIZE));
+        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        this.customizeBootstrap(serverBootstrap);
+
+        final ChannelFuture channelFuture = serverBootstrap.bind(address);
+        LOG.debug("Initiated server {} at {}.", channelFuture, address);
+        return channelFuture;
     }
 
-    protected void customizeBootstrap(final Bootstrap b) {
+    protected void customizeBootstrap(final Bootstrap bootstrap) {
         if (this.keys != null && !this.keys.isEmpty()) {
-            if (this.cf == null) {
+            if (this.channelFactory == null) {
                 throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
             }
-            b.channelFactory(this.cf);
-            b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
+            bootstrap.channelFactory(this.channelFactory);
+            bootstrap.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
         }
 
         // Make sure we are doing round-robin processing
-        b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
     }
 
-    private void customizeBootstrap(final ServerBootstrap b) {
+    private void customizeBootstrap(final ServerBootstrap serverBootstrap) {
         if (this.keys != null && !this.keys.isEmpty()) {
-            if (this.scf == null) {
+            if (this.serverChannelFactory == null) {
                 throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
             }
-            b.channelFactory(this.scf);
-            b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
+            serverBootstrap.channelFactory(this.serverChannelFactory);
+            serverBootstrap.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
         }
 
         // Make sure we are doing round-robin processing
-        b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        serverBootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
 
-        if (b.group() == null) {
-            b.group(this.bossGroup, this.workerGroup);
+        if (serverBootstrap.group() == null) {
+            serverBootstrap.group(this.bossGroup, this.workerGroup);
         }
 
         try {
-            b.channel(NioServerSocketChannel.class);
+            serverBootstrap.channel(NioServerSocketChannel.class);
         } catch (final IllegalStateException e) {
-            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+            LOG.trace("Not overriding channelFactory on bootstrap {}", serverBootstrap, e);
         }
     }
 
-    private void setWorkerGroup(final Bootstrap b) {
-        if (b.group() == null) {
-            b.group(this.workerGroup);
+    private void setWorkerGroup(final Bootstrap bootstrap) {
+        if (bootstrap.group() == null) {
+            bootstrap.group(this.workerGroup);
         }
         try {
-            b.channel(NioSocketChannel.class);
+            bootstrap.channel(NioSocketChannel.class);
         } catch (final IllegalStateException e) {
-            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+            LOG.trace("Not overriding channelFactory on bootstrap {}", bootstrap, e);
         }
     }
 
@@ -187,10 +187,10 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
             createChannelPipelineInitializer(final BGPHandlerFactory hf, final T snf) {
             return new ChannelPipelineInitializer<S>() {
                 @Override
-                public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
-                    ch.pipeline().addLast(hf.getDecoders());
-                    ch.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(ch, promise));
-                    ch.pipeline().addLast(hf.getEncoders());
+                public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+                    channel.pipeline().addLast(hf.getDecoders());
+                    channel.pipeline().addLast(NEGOTIATOR, snf.getSessionNegotiator(channel, promise));
+                    channel.pipeline().addLast(hf.getEncoders());
                 }
             };
         }
@@ -198,8 +198,8 @@ public class BGPDispatcherImpl implements BGPDispatcher, AutoCloseable {
         public static <S extends BGPSession> ChannelHandler createChannelInitializer(final ChannelPipelineInitializer initializer, final Promise<S> promise) {
             return new ChannelInitializer<SocketChannel>() {
                 @Override
-                protected void initChannel(final SocketChannel ch) {
-                    initializer.initializeChannel(ch, promise);
+                protected void initChannel(final SocketChannel channel) {
+                    initializer.initializeChannel(channel, promise);
                 }
             };
         }
index 7adb1fb027adb5082fac54f4c26dec29db4457cb..8a19b897a35287d1f3f3fa3597f9fae6d044aa99 100644 (file)
@@ -27,31 +27,31 @@ import org.slf4j.LoggerFactory;
 public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultPromise<S> {
     private static final Logger LOG = LoggerFactory.getLogger(BGPProtocolSessionPromise.class);
     private final ReconnectStrategy strategy;
-    private final Bootstrap b;
+    private final Bootstrap bootstrap;
 
     private InetSocketAddress address;
     @GuardedBy("this")
     private Future<?> pending;
 
-    public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap b) {
+    public BGPProtocolSessionPromise(EventExecutor executor, InetSocketAddress address, ReconnectStrategy strategy, Bootstrap bootstrap) {
         super(executor);
         this.strategy = Preconditions.checkNotNull(strategy);
         this.address = Preconditions.checkNotNull(address);
-        this.b = Preconditions.checkNotNull(b);
+        this.bootstrap = Preconditions.checkNotNull(bootstrap);
     }
 
     public synchronized void connect() {
         final BGPProtocolSessionPromise lock = this;
 
         try {
-            int e = this.strategy.getConnectTimeout();
-            LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(e));
+            int timeout = this.strategy.getConnectTimeout();
+            LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(timeout));
             if (this.address.isUnresolved()) {
                 this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
             }
 
-            this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, e);
-            final ChannelFuture connectFuture = this.b.connect(this.address);
+            this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+            final ChannelFuture connectFuture = this.bootstrap.connect(this.address);
             connectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener(lock));
             this.pending = connectFuture;
         } catch (Exception e) {
@@ -86,23 +86,23 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
         }
 
         @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
+        public void operationComplete(final ChannelFuture channelFuture) throws Exception {
             synchronized (this.lock) {
                 BGPProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
-                Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(cf));
+                Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(channelFuture));
                 if (BGPProtocolSessionPromise.this.isCancelled()) {
-                    if (cf.isSuccess()) {
+                    if (channelFuture.isSuccess()) {
                         BGPProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
-                        cf.channel().close();
+                        channelFuture.channel().close();
                     }
 
-                } else if (cf.isSuccess()) {
+                } else if (channelFuture.isSuccess()) {
                     BGPProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
                 } else {
-                    BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, cf.cause());
-                    final Future rf = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
-                    rf.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
-                    BGPProtocolSessionPromise.this.pending = rf;
+                    BGPProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", BGPProtocolSessionPromise.this.address, channelFuture.cause());
+                    final Future reconnectFuture = BGPProtocolSessionPromise.this.strategy.scheduleReconnect(channelFuture.cause());
+                    reconnectFuture.addListener(new BGPProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
+                    BGPProtocolSessionPromise.this.pending = reconnectFuture;
                 }
             }
         }
@@ -112,14 +112,14 @@ public class BGPProtocolSessionPromise<S extends BGPSession> extends DefaultProm
             }
 
             @Override
-            public void operationComplete(final Future<Void> sf) {
+            public void operationComplete(final Future<Void> sessionFuture) {
                 synchronized (BootstrapConnectListener.this.lock) {
-                    Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sf));
+                    Preconditions.checkState(BGPProtocolSessionPromise.this.pending.equals(sessionFuture));
                     if (!BGPProtocolSessionPromise.this.isCancelled()) {
-                        if (sf.isSuccess()) {
+                        if (sessionFuture.isSuccess()) {
                             BGPProtocolSessionPromise.this.connect();
                         } else {
-                            BGPProtocolSessionPromise.this.setFailure(sf.cause());
+                            BGPProtocolSessionPromise.this.setFailure(sessionFuture.cause());
                         }
                     }
 
index d2fa76e02a7de1bd06fcee63b9923d414558cd2f..dc8479edfec2b739181a814abc4076fb6fbca581 100644 (file)
@@ -32,30 +32,30 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
 
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
-    private final Bootstrap b;
+    private final Bootstrap bootstrap;
     private final ChannelPipelineInitializer initializer;
     private final EventExecutor executor;
     private Future<S> pending;
 
     public BGPReconnectPromise(final EventExecutor executor, final InetSocketAddress address,
-                               final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b,
+                               final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap bootstrap,
                                final ChannelPipelineInitializer initializer) {
         super(executor);
         this.executor = executor;
-        this.b = b;
+        this.bootstrap = bootstrap;
         this.initializer = Preconditions.checkNotNull(initializer);
         this.address = Preconditions.checkNotNull(address);
         this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
     }
 
     public synchronized void connect() {
-        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+        final ReconnectStrategy reconnectStrategy = this.strategyFactory.createReconnectStrategy();
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        pending = createClient(this.address, cs, b, new ChannelPipelineInitializer<S>() {
+        this.pending = connectSessionPromise(this.address, reconnectStrategy, this.bootstrap, new ChannelPipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                initializer.initializeChannel(channel, promise);
+                BGPReconnectPromise.this.initializer.initializeChannel(channel, promise);
                 // add closed channel handler
                 // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
                 // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
@@ -64,9 +64,9 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
             }
         });
 
-        pending.addListener(new GenericFutureListener<Future<Object>>() {
+        this.pending.addListener(new GenericFutureListener<Future<Object>>() {
             @Override
-            public void operationComplete(Future<Object> future) throws Exception {
+            public void operationComplete(final Future<Object> future) throws Exception {
                 if (!future.isSuccess()) {
                     BGPReconnectPromise.this.setFailure(future.cause());
                 }
@@ -74,38 +74,37 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         });
     }
 
-    public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
+    public Future<S> connectSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap,
                                   final ChannelPipelineInitializer initializer) {
-        final BGPProtocolSessionPromise p = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
+        final BGPProtocolSessionPromise sessionPromise = new BGPProtocolSessionPromise(this.executor, address, strategy, bootstrap);
         final ChannelHandler chInit = new ChannelInitializer<SocketChannel>() {
             @Override
-            protected void initChannel(SocketChannel ch) {
-                initializer.initializeChannel(ch, p);
+            protected void initChannel(final SocketChannel channel) {
+                initializer.initializeChannel(channel, sessionPromise);
             }
         };
 
         bootstrap.handler(chInit);
-        p.connect();
+        sessionPromise.connect();
         LOG.debug("Client created.");
-        return p;
+        return sessionPromise;
     }
 
     /**
      * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
      */
     private boolean isInitialConnectFinished() {
-        Preconditions.checkNotNull(pending);
-        return pending.isDone() && pending.isSuccess();
+        Preconditions.checkNotNull(this.pending);
+        return this.pending.isDone() && this.pending.isSuccess();
     }
 
     @Override
     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
         if (super.cancel(mayInterruptIfRunning)) {
-            Preconditions.checkNotNull(pending);
+            Preconditions.checkNotNull(this.pending);
             this.pending.cancel(mayInterruptIfRunning);
             return true;
         }
-
         return false;
     }
 
@@ -123,16 +122,16 @@ public class BGPReconnectPromise<S extends BGPSession> extends DefaultPromise<Vo
         @Override
         public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
             // This is the ultimate channel inactive handler, not forwarding
-            if (promise.isCancelled()) {
+            if (this.promise.isCancelled()) {
                 return;
             }
 
-            if (!promise.isInitialConnectFinished()) {
-                LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+            if (!this.promise.isInitialConnectFinished()) {
+                LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
             }
 
-            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
-            promise.connect();
+            LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
+            this.promise.connect();
         }
     }
 }
\ No newline at end of file