* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
*/
-public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> {
protected interface PipelineInitializer<S extends ProtocolSession<?>> {
/**
private final EventLoopGroup workerGroup;
-
- /**
- * Internally creates new instances of NioEventLoopGroup, might deplete system resources and result in Too many open files exception.
- *
- * @deprecated use {@link AbstractDispatcher#AbstractDispatcher(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)} instead.
- */
- @Deprecated
- protected AbstractDispatcher() {
- this(new NioEventLoopGroup(),new NioEventLoopGroup());
- }
-
protected AbstractDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
}
- @Override
- public void close() {
- try {
- this.workerGroup.shutdownGracefully();
- } finally {
- this.bossGroup.shutdownGracefully();
- }
- }
}
*/
package org.opendaylight.protocol.framework;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class ServerTest {
SimpleDispatcher clientDispatcher, dispatcher;
ChannelFuture server = null;
InetSocketAddress serverAddress;
+ private NioEventLoopGroup eventLoopGroup;
+
@Before
public void setUp() {
final int port = 10000 + (int)(10000 * Math.random());
serverAddress = new InetSocketAddress("127.0.0.5", port);
+ eventLoopGroup = new NioEventLoopGroup();
}
@Test
p.setSuccess(true);
return new SimpleSessionNegotiator(promise, channel);
}
- }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
@Override
final Channel channel, final Promise<SimpleSession> promise) {
return new SimpleSessionNegotiator(promise, channel);
}
- }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
p.setSuccess(true);
return new SimpleSessionNegotiator(promise, channel);
}
- }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
@Override
final Channel channel, final Promise<SimpleSession> promise) {
return new SimpleSessionNegotiator(promise, channel);
}
- }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws IOException, InterruptedException {
this.server.channel().close();
- this.dispatcher.close();
- this.clientDispatcher.close();
+ this.eventLoopGroup.shutdownGracefully();
try {
- Thread.sleep(100);
+ Thread.sleep(500);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
private final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory;
private final ProtocolHandlerFactory<?> factory;
+
private final class SimplePipelineInitializer implements PipelineInitializer<SimpleSession> {
final SessionListenerFactory<SimpleSessionListener> listenerFactory;
}
public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
- final Promise<SimpleSession> promise) {
- super(new NioEventLoopGroup(), new NioEventLoopGroup());
+ final Promise<SimpleSession> promise, EventLoopGroup eventLoopGroup) {
+ super(eventLoopGroup, eventLoopGroup);
this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
this.factory = Preconditions.checkNotNull(factory);
}
public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
}
+
}