protocol-framework: use lambdas 73/57173/2
authorStephen Kitt <skitt@redhat.com>
Tue, 16 May 2017 15:47:17 +0000 (17:47 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 17 May 2017 08:51:35 +0000 (08:51 +0000)
This series of patches uses lambdas instead of anonymous classes for
functional interfaces when possible. Lambdas are replaced with method
references when appropriate.

Change-Id: Ife557068a519c0b2822b9ee5dc4b5150373e0cf8
Signed-off-by: Stephen Kitt <skitt@redhat.com>
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java

index ec315bc0bd5bc3f565332c6c2e97134677ea9ef1..a883eaf4ea1fa55980f3f897d4dcb45ca500c2bf 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.protocol.framework;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -61,15 +60,12 @@ public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSes
      */
     protected final void sendMessage(final M msg) {
         this.channel.writeAndFlush(msg).addListener(
-                new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(final ChannelFuture f) {
-                        if (!f.isSuccess()) {
-                            LOG.info("Failed to send message {}", msg, f.cause());
-                            negotiationFailed(f.cause());
-                        } else {
-                            LOG.trace("Message {} sent to socket", msg);
-                        }
+                (ChannelFutureListener) f -> {
+                    if (!f.isSuccess()) {
+                        LOG.info("Failed to send message {}", msg, f.cause());
+                        negotiationFailed(f.cause());
+                    } else {
+                        LOG.trace("Message {} sent to socket", msg);
                     }
                 });
     }
index ddf5d438f3b3f74d1e915c76be78e2fd85b4fbc8..f9fadb04b0e0c19b8d89444883c71ac0d58a1a5e 100644 (file)
@@ -11,12 +11,10 @@ import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,24 +44,18 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
         final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
-        pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
-            @Override
-            public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                initializer.initializeChannel(channel, promise);
-                // add closed channel handler
-                // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
-                // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
-                // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
-                channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
-            }
+        pending = this.dispatcher.createClient(this.address, cs, b, (channel, promise) -> {
+            initializer.initializeChannel(channel, promise);
+            // add closed channel handler
+            // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+            // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+            // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+            channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
         });
 
-        pending.addListener(new GenericFutureListener<Future<Object>>() {
-            @Override
-            public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess()) {
-                    ReconnectPromise.this.setFailure(future.cause());
-                }
+        pending.addListener((GenericFutureListener<Future<Object>>) future -> {
+            if (!future.isSuccess()) {
+                ReconnectPromise.this.setFailure(future.cause());
             }
         });
     }
index ed60abb05adb9f25e92fe751a4e8162c531da458..6a1607803b9310c1a1a5a43013ff2906b38b7290 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.protocol.framework;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -129,16 +128,13 @@ public final class TimedReconnectStrategy implements ReconnectStrategy {
         this.scheduled = true;
 
         // Schedule a task for the right time. It will also clear the flag.
-        return this.executor.schedule(new Callable<Void>() {
-            @Override
-            public Void call() throws TimeoutException {
-                synchronized (lock) {
-                    Preconditions.checkState(TimedReconnectStrategy.this.scheduled);
-                    TimedReconnectStrategy.this.scheduled = false;
-                }
-
-                return null;
+        return this.executor.schedule(() -> {
+            synchronized (lock) {
+                Preconditions.checkState(TimedReconnectStrategy.this.scheduled);
+                TimedReconnectStrategy.this.scheduled = false;
             }
+
+            return null;
         }, this.lastSleep, TimeUnit.MILLISECONDS);
     }
 
index 331e94949e2506a470ad31dae5de31cb12d256f3..ce82096cfe9e3676d6609c73966e8e7a23d9c495 100644 (file)
@@ -17,7 +17,6 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultPromise;
@@ -76,13 +75,7 @@ public class ServerTest {
 
         final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
 
-        this.clientDispatcher.createClient(this.serverAddress,
-                mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                });
+        this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
 
         Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
     }
@@ -93,25 +86,14 @@ public class ServerTest {
 
         final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
 
-        this.clientDispatcher.createClient(this.serverAddress,
-                mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                });
+        this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
 
         Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
 
         final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
@@ -124,12 +106,7 @@ public class ServerTest {
 
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
@@ -137,12 +114,7 @@ public class ServerTest {
 
         final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
         this.session = this.clientDispatcher.createClient(this.serverAddress,
-                reconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                }).get(6, TimeUnit.SECONDS);
+                reconnectStrategy, SimpleSessionListener::new).get(6, TimeUnit.SECONDS);
 
         assertEquals(true, p.get(3, TimeUnit.SECONDS));
 
@@ -158,12 +130,7 @@ public class ServerTest {
 
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
@@ -174,12 +141,7 @@ public class ServerTest {
         doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
 
         this.clientDispatcher.createReconnectingClient(this.serverAddress,
-                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                });
+                reconnectStrategyFactory, SimpleSessionListener::new);
 
         assertEquals(true, p.get(3, TimeUnit.SECONDS));
         shutdownServer();
@@ -193,24 +155,15 @@ public class ServerTest {
 
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
         this.clientDispatcher = getClientDispatcher();
 
         this.session = this.clientDispatcher.createClient(this.serverAddress,
-                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        }).get(6, TimeUnit.SECONDS);
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+                TimeUnit.SECONDS);
 
         assertEquals(true, p.get(3, TimeUnit.SECONDS));
     }
@@ -221,32 +174,18 @@ public class ServerTest {
 
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
         this.clientDispatcher = getClientDispatcher();
 
         this.session = this.clientDispatcher.createClient(this.serverAddress,
-                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        }).get(6, TimeUnit.SECONDS);
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+                TimeUnit.SECONDS);
 
         final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
-                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new);
         assertFalse(session.isSuccess());
     }
 
@@ -256,40 +195,24 @@ public class ServerTest {
 
         this.dispatcher = getServerDispatcher(p);
 
-        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
-            @Override
-            public SimpleSessionListener getSessionListener() {
-                return new SimpleSessionListener();
-            }
-        });
+        this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
 
         this.server.get();
 
-        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                                                                         final Channel channel, final Promise<SimpleSession> promise) {
-
-                return new SimpleSessionNegotiator(promise, channel) {
+        this.clientDispatcher = new SimpleDispatcher(
+                (factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel) {
                     @Override
                     protected void startNegotiation() throws Exception {
                         negotiationFailed(new IllegalStateException("Negotiation failed"));
                     }
-                };
-            }
-        }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+                }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
 
         final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
         final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
         doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
 
         this.clientDispatcher.createReconnectingClient(this.serverAddress,
-                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
-                    @Override
-                    public SimpleSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                });
+                reconnectStrategyFactory, SimpleSessionListener::new);
 
 
         // Reconnect strategy should be consulted at least twice, for initial connect and reconnect attempts after drop
@@ -297,13 +220,7 @@ public class ServerTest {
     }
 
     private SimpleDispatcher getClientDispatcher() {
-        return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                                                                         final Channel channel, final Promise<SimpleSession> promise) {
-                return new SimpleSessionNegotiator(promise, channel);
-            }
-        }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+        return new SimpleDispatcher((factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel), new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
     }
 
     private ReconnectStrategy getMockedReconnectStrategy() throws Exception {
@@ -324,14 +241,9 @@ public class ServerTest {
     }
 
     private SimpleDispatcher getServerDispatcher(final Promise<Boolean> p) {
-        return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                                                                         final Channel channel, final Promise<SimpleSession> promise) {
-                p.setSuccess(true);
-                return new SimpleSessionNegotiator(promise, channel);
-            }
+        return new SimpleDispatcher((factory, channel, promise) -> {
+            p.setSuccess(true);
+            return new SimpleSessionNegotiator(promise, channel);
         }, null, serverLoopGroup);
     }