From: Stephen Kitt Date: Tue, 16 May 2017 15:47:17 +0000 (+0200) Subject: protocol-framework: use lambdas X-Git-Tag: release/nitrogen~216 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e2d1c4c0fb80825a35e552c78b13808fa48f9197;hp=503d824302de98ae7d9fd44c6c417ed651865919 protocol-framework: use lambdas This series of patches uses lambdas instead of anonymous classes for functional interfaces when possible. Lambdas are replaced with method references when appropriate. Change-Id: Ife557068a519c0b2822b9ee5dc4b5150373e0cf8 Signed-off-by: Stephen Kitt --- diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java index ec315bc0bd..a883eaf4ea 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java @@ -8,7 +8,6 @@ package org.opendaylight.protocol.framework; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -61,15 +60,12 @@ public abstract class AbstractSessionNegotiator { + if (!f.isSuccess()) { + LOG.info("Failed to send message {}", msg, f.cause()); + negotiationFailed(f.cause()); + } else { + LOG.trace("Message {} sent to socket", msg); } }); } diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index ddf5d438f3..f9fadb04b0 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -11,12 +11,10 @@ import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,24 +44,18 @@ final class ReconnectPromise, L extends SessionList final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts - pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer() { - @Override - public void initializeChannel(final SocketChannel channel, final Promise promise) { - initializer.initializeChannel(channel, promise); - // add closed channel handler - // This handler has to be added as last channel handler and the channel inactive event has to be caught by it - // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work - // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started - channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this)); - } + pending = this.dispatcher.createClient(this.address, cs, b, (channel, promise) -> { + initializer.initializeChannel(channel, promise); + // add closed channel handler + // This handler has to be added as last channel handler and the channel inactive event has to be caught by it + // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work + // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started + channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this)); }); - pending.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(final Future future) throws Exception { - if (!future.isSuccess()) { - ReconnectPromise.this.setFailure(future.cause()); - } + pending.addListener((GenericFutureListener>) future -> { + if (!future.isSuccess()) { + ReconnectPromise.this.setFailure(future.cause()); } }); } diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java index ed60abb05a..6a1607803b 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java @@ -10,7 +10,6 @@ package org.opendaylight.protocol.framework; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -129,16 +128,13 @@ public final class TimedReconnectStrategy implements ReconnectStrategy { this.scheduled = true; // Schedule a task for the right time. It will also clear the flag. - return this.executor.schedule(new Callable() { - @Override - public Void call() throws TimeoutException { - synchronized (lock) { - Preconditions.checkState(TimedReconnectStrategy.this.scheduled); - TimedReconnectStrategy.this.scheduled = false; - } - - return null; + return this.executor.schedule(() -> { + synchronized (lock) { + Preconditions.checkState(TimedReconnectStrategy.this.scheduled); + TimedReconnectStrategy.this.scheduled = false; } + + return null; }, this.lastSleep, TimeUnit.MILLISECONDS); } diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java index 331e94949e..ce82096cfe 100644 --- a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java +++ b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java @@ -17,7 +17,6 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultPromise; @@ -76,13 +75,7 @@ public class ServerTest { final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy(); - this.clientDispatcher.createClient(this.serverAddress, - mockReconnectStrategy, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new); Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class)); } @@ -93,25 +86,14 @@ public class ServerTest { final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy(); - this.clientDispatcher.createClient(this.serverAddress, - mockReconnectStrategy, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new); Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class)); final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); @@ -124,12 +106,7 @@ public class ServerTest { this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); @@ -137,12 +114,7 @@ public class ServerTest { final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy(); this.session = this.clientDispatcher.createClient(this.serverAddress, - reconnectStrategy, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }).get(6, TimeUnit.SECONDS); + reconnectStrategy, SimpleSessionListener::new).get(6, TimeUnit.SECONDS); assertEquals(true, p.get(3, TimeUnit.SECONDS)); @@ -158,12 +130,7 @@ public class ServerTest { this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); @@ -174,12 +141,7 @@ public class ServerTest { doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy(); this.clientDispatcher.createReconnectingClient(this.serverAddress, - reconnectStrategyFactory, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + reconnectStrategyFactory, SimpleSessionListener::new); assertEquals(true, p.get(3, TimeUnit.SECONDS)); shutdownServer(); @@ -193,24 +155,15 @@ public class ServerTest { this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); this.clientDispatcher = getClientDispatcher(); this.session = this.clientDispatcher.createClient(this.serverAddress, - new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }).get(6, TimeUnit.SECONDS); + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6, + TimeUnit.SECONDS); assertEquals(true, p.get(3, TimeUnit.SECONDS)); } @@ -221,32 +174,18 @@ public class ServerTest { this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); this.clientDispatcher = getClientDispatcher(); this.session = this.clientDispatcher.createClient(this.serverAddress, - new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }).get(6, TimeUnit.SECONDS); + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6, + TimeUnit.SECONDS); final Future session = this.clientDispatcher.createClient(this.serverAddress, - new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new); assertFalse(session.isSuccess()); } @@ -256,40 +195,24 @@ public class ServerTest { this.dispatcher = getServerDispatcher(p); - this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new); this.server.get(); - this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory() { - @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - - return new SimpleSessionNegotiator(promise, channel) { + this.clientDispatcher = new SimpleDispatcher( + (factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel) { @Override protected void startNegotiation() throws Exception { negotiationFailed(new IllegalStateException("Negotiation failed")); } - }; - } - }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup); + }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup); final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class); final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy(); doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy(); this.clientDispatcher.createReconnectingClient(this.serverAddress, - reconnectStrategyFactory, new SessionListenerFactory() { - @Override - public SimpleSessionListener getSessionListener() { - return new SimpleSessionListener(); - } - }); + reconnectStrategyFactory, SimpleSessionListener::new); // Reconnect strategy should be consulted at least twice, for initial connect and reconnect attempts after drop @@ -297,13 +220,7 @@ public class ServerTest { } private SimpleDispatcher getClientDispatcher() { - return new SimpleDispatcher(new SessionNegotiatorFactory() { - @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - return new SimpleSessionNegotiator(promise, channel); - } - }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup); + return new SimpleDispatcher((factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel), new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup); } private ReconnectStrategy getMockedReconnectStrategy() throws Exception { @@ -324,14 +241,9 @@ public class ServerTest { } private SimpleDispatcher getServerDispatcher(final Promise p) { - return new SimpleDispatcher(new SessionNegotiatorFactory() { - - @Override - public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, - final Channel channel, final Promise promise) { - p.setSuccess(true); - return new SimpleSessionNegotiator(promise, channel); - } + return new SimpleDispatcher((factory, channel, promise) -> { + p.setSuccess(true); + return new SimpleSessionNegotiator(promise, channel); }, null, serverLoopGroup); }