import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
- this.clientDispatcher.createClient(this.serverAddress,
- mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
}
final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
- this.clientDispatcher.createClient(this.serverAddress,
- mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.dispatcher = getServerDispatcher(p);
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
this.server.get();
this.dispatcher = getServerDispatcher(p);
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
this.server.get();
final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
this.session = this.clientDispatcher.createClient(this.serverAddress,
- reconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- }).get(6, TimeUnit.SECONDS);
+ reconnectStrategy, SimpleSessionListener::new).get(6, TimeUnit.SECONDS);
assertEquals(true, p.get(3, TimeUnit.SECONDS));
this.dispatcher = getServerDispatcher(p);
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
this.server.get();
doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
this.clientDispatcher.createReconnectingClient(this.serverAddress,
- reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ reconnectStrategyFactory, SimpleSessionListener::new);
assertEquals(true, p.get(3, TimeUnit.SECONDS));
shutdownServer();
this.dispatcher = getServerDispatcher(p);
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
this.server.get();
this.clientDispatcher = getClientDispatcher();
this.session = this.clientDispatcher.createClient(this.serverAddress,
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- }).get(6, TimeUnit.SECONDS);
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+ TimeUnit.SECONDS);
assertEquals(true, p.get(3, TimeUnit.SECONDS));
}
this.dispatcher = getServerDispatcher(p);
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
this.server.get();
this.clientDispatcher = getClientDispatcher();
this.session = this.clientDispatcher.createClient(this.serverAddress,
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- }).get(6, TimeUnit.SECONDS);
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+ TimeUnit.SECONDS);
final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new);
assertFalse(session.isSuccess());
}
+ @Test
+ public void testNegotiationFailedReconnect() throws Exception {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = new SimpleDispatcher(
+ (factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel) {
+ @Override
+ protected void startNegotiation() throws Exception {
+ negotiationFailed(new IllegalStateException("Negotiation failed"));
+ }
+ }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+ final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+ final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+ doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+ this.clientDispatcher.createReconnectingClient(this.serverAddress,
+ reconnectStrategyFactory, SimpleSessionListener::new);
+
+
+ // Reconnect strategy should be consulted at least twice, for initial connect and reconnect attempts after drop
+ verify(reconnectStrategyFactory, timeout((int) TimeUnit.MINUTES.toMillis(3)).atLeast(2)).createReconnectStrategy();
+ }
+
private SimpleDispatcher getClientDispatcher() {
- return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
- @Override
- public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
- final Channel channel, final Promise<SimpleSession> promise) {
- return new SimpleSessionNegotiator(promise, channel);
- }
- }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+ return new SimpleDispatcher((factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel), new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
}
private ReconnectStrategy getMockedReconnectStrategy() throws Exception {
}
private SimpleDispatcher getServerDispatcher(final Promise<Boolean> p) {
- return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-
- @Override
- public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
- final Channel channel, final Promise<SimpleSession> promise) {
- p.setSuccess(true);
- return new SimpleSessionNegotiator(promise, channel);
- }
+ return new SimpleDispatcher((factory, channel, promise) -> {
+ p.setSuccess(true);
+ return new SimpleSessionNegotiator(promise, channel);
}, null, serverLoopGroup);
}