EventLoopGroup instances moved to constructor parameters for AbstractDispatcher. 44/2544/2
authorMaros Marsalek <mmarsale@cisco.com>
Fri, 8 Nov 2013 10:21:17 +0000 (11:21 +0100)
committerRobert Varga <rovarga@cisco.com>
Fri, 8 Nov 2013 15:48:00 +0000 (16:48 +0100)
Change-Id: I489913f3766e9562ec8cefeae0870fdd88b757c7
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java
framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java
pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/Main.java
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/BundleActivator.java

index a590a630048796ad8fb3ef7e7401a13adb9d5b42..54d75e0bcbd1b7983d87722b815efb0e2f433bae 100644 (file)
@@ -7,14 +7,12 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
-
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
@@ -23,6 +21,8 @@ import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
+import java.net.InetSocketAddress;
+
 /**
  * Implementation of BGPDispatcher.
  */
@@ -31,8 +31,8 @@ public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl,
 
        private final BGPHandlerFactory hf;
 
-       public BGPDispatcherImpl(final BGPMessageFactory parser) {
-               super();
+       public BGPDispatcherImpl(final BGPMessageFactory parser, EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+               super(bossGroup, workerGroup);
                this.hf = new BGPHandlerFactory(parser);
        }
 
index 348ae034c63e82d44b57eae12f172afed9be558c..09caaa913861cb86f52605259bd6c3d6ef91182f 100644 (file)
@@ -7,11 +7,8 @@
  */
 package org.opendaylight.protocol.bgp.testtool;
 
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext;
@@ -24,6 +21,9 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
 /**
  * Starter class for testing.
  */
@@ -54,7 +54,8 @@ public class Main {
        private static final int RECONNECT_MILLIS = 5000;
 
        Main() throws Exception {
-               this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactoryImpl(ServiceLoaderBGPExtensionProviderContext.createConsumerContext().getMessageRegistry()));
+        this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactoryImpl(ServiceLoaderBGPExtensionProviderContext
+                .createConsumerContext().getMessageRegistry()), new NioEventLoopGroup(), new NioEventLoopGroup());
        }
 
        public static void main(final String[] args) throws Exception {
index 24db1cae01269a4ec9e96d6eb24729b9e8387dac..8264d83318b1c584ea14ea413f9e9fe5201c7ed6 100644 (file)
@@ -7,14 +7,13 @@
  */
 package org.opendaylight.protocol.bgp.testtool;
 
+import com.google.common.base.Preconditions;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
-
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext;
@@ -32,7 +31,7 @@ import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
 import org.opendaylight.yangtools.yang.binding.Notification;
 
-import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
 
 public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends AbstractDispatcher<S, L> {
 
@@ -41,6 +40,7 @@ public class BGPSpeakerMock<M, S extends ProtocolSession<M>, L extends SessionLi
 
        public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
                        final DefaultPromise<BGPSessionImpl> defaultPromise) {
+               super(new NioEventLoopGroup(), new NioEventLoopGroup());
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }
index afea289c007f1dfdb8df9aa90f0ce5f622f5397b..c040170e1b3dbbcc50c7341139c37ecdf63389d6 100644 (file)
@@ -21,13 +21,12 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
@@ -38,7 +37,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
                /**
                 * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
                 * method needs to be implemented in protocol specific Dispatchers.
-                * 
+                *
                 * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
                 * @param promise to be passed to {@link SessionNegotiatorFactory}
                 */
@@ -52,18 +51,28 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
        private final EventLoopGroup workerGroup;
 
+
+       /**
+        * Internally creates new instances of NioEventLoopGroup, might deplete system resources and result in Too many open files exception.
+        *
+        * @deprecated use {@link AbstractDispatcher#AbstractDispatcher(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)} instead.
+        */
+       @Deprecated
        protected AbstractDispatcher() {
-               // FIXME: we should get these as arguments
-               this.bossGroup = new NioEventLoopGroup();
-               this.workerGroup = new NioEventLoopGroup();
+               this(new NioEventLoopGroup(),new NioEventLoopGroup());
+       }
+
+       protected AbstractDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+               this.bossGroup = bossGroup;
+               this.workerGroup = workerGroup;
        }
 
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
-        * 
+        *
         * @param address address to which the server should be bound
         * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
-        * 
+        *
         * @return ChannelFuture representing the binding process
         */
        protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
@@ -89,12 +98,12 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
        /**
         * Creates a client.
-        * 
+        *
         * @param address remote address
         * @param connectStrategy Reconnection strategy to be used when initial connection fails
-        * 
+        *
         * @return Future representing the connection process. Its result represents the combined success of TCP connection
-        *         as well as session negotiation.
+        *               as well as session negotiation.
         */
        protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
                final Bootstrap b = new Bootstrap();
@@ -114,13 +123,13 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
 
        /**
         * Creates a client.
-        * 
+        *
         * @param address remote address
         * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
         * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
-        * 
+        *
         * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
-        *         success if it indicates no further attempts should be made and failure if it reports an error
+        *               success if it indicates no further attempts should be made and failure if it reports an error
         */
        protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
                        final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
index bbaf5ca4ac25363605a08d2ea4d8e42a77429c38..68bf1ceb844b64f954c8abf166814cd1dc11c0e7 100644 (file)
@@ -1,16 +1,15 @@
 package org.opendaylight.protocol.framework;
 
+import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
-
-import java.net.InetSocketAddress;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
 
 public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSessionListener> {
 
@@ -38,6 +37,7 @@ public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSe
 
        public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
                        final Promise<SimpleSession> promise) {
+               super(new NioEventLoopGroup(), new NioEventLoopGroup());
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }
index 0e3351fd8b1a0bb2358e1c8157d32abdf455fa9b..05e09aa7add5294651de2a42f8bd591be688f639 100644 (file)
@@ -7,13 +7,11 @@
  */
 package org.opendaylight.protocol.pcep.impl;
 
+import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
@@ -22,7 +20,8 @@ import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.spi.MessageHandlerRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.net.InetSocketAddress;
 
 /**
  * Implementation of PCEPDispatcher.
@@ -34,11 +33,13 @@ public class PCEPDispatcherImpl extends AbstractDispatcher<PCEPSessionImpl, PCEP
 
        /**
         * Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
-        * 
+        *
         * @throws IOException if some error occurred during opening the selector
         */
-       public PCEPDispatcherImpl(final MessageHandlerRegistry registry, final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> negotiatorFactory) {
-               super();
+       public PCEPDispatcherImpl(final MessageHandlerRegistry registry,
+                       final SessionNegotiatorFactory<Message, PCEPSessionImpl, PCEPSessionListener> negotiatorFactory,
+                       EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+               super(bossGroup, workerGroup);
                this.snf = Preconditions.checkNotNull(negotiatorFactory);
                this.hf = new PCEPHandlerFactory(registry);
        }
index e8dd2b4820c1ff02915e5dca3f7f9713f25a3370..2855f4f288873df9883241b41655c2773794ecde 100644 (file)
@@ -7,11 +7,8 @@
  */
 package org.opendaylight.protocol.pcep.testtool;
 
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl;
@@ -21,6 +18,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
 public class Main {
 
        private static final Logger logger = LoggerFactory.getLogger(Main.class);
@@ -126,7 +126,9 @@ public class Main {
 
                final Open prefs = spf.getSessionProposal(address, 0);
 
-               final PCEPDispatcherImpl dispatcher = new PCEPDispatcherImpl(PCEPExtensionProviderContextImpl.getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), prefs, 5));
+               final PCEPDispatcherImpl dispatcher = new PCEPDispatcherImpl(PCEPExtensionProviderContextImpl
+                               .getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(
+                               new HashedWheelTimer(), prefs, 5), new NioEventLoopGroup(), new NioEventLoopGroup());
 
                dispatcher.createServer(address, new TestingSessionListenerFactory()).get();
        }
index 88899670795390464833d384aee66e3df2e64e0c..dc846b37516b08727b19cea5eeb7e3421b9c5852 100644 (file)
@@ -7,15 +7,14 @@
  */
 package org.opendaylight.protocol.pcep.testtool;
 
+import com.google.common.base.Preconditions;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
-
-import java.net.InetSocketAddress;
-
 import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
@@ -34,7 +33,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.TlvsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.predundancy.group.id.tlv.PredundancyGroupIdBuilder;
 
-import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
 
 public class PCCMock<M, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends AbstractDispatcher<S, L> {
 
@@ -43,6 +42,7 @@ public class PCCMock<M, S extends ProtocolSession<M>, L extends SessionListener<
 
        public PCCMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
                        final DefaultPromise<PCEPSessionImpl> defaultPromise) {
+               super(new NioEventLoopGroup(), new NioEventLoopGroup());
                this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
                this.factory = Preconditions.checkNotNull(factory);
        }
index 21cb7ffa554d34763deb294dd82b8d2b5f4db53d..7a51170d8e5df730a3b9966dc7539d5f1a4ca7b6 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
+import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 
 import java.net.InetSocketAddress;
@@ -28,7 +30,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
 
 public final class BundleActivator extends AbstractBindingAwareProvider {
        private static final Logger LOG = LoggerFactory.getLogger(BundleActivator.class);
@@ -41,7 +44,10 @@ public final class BundleActivator extends AbstractBindingAwareProvider {
                final InetSocketAddress address = new InetSocketAddress("0.0.0.0", 4189);
                final PCEPSessionProposalFactory spf = new PCEPSessionProposalFactoryImpl(30, 10, true, true, true, true, 0);
                final Open prefs = spf.getSessionProposal(address, 0);
-               final PCEPDispatcher dispatcher = new PCEPDispatcherImpl(PCEPExtensionProviderContextImpl.getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), prefs, 5));
+               final PCEPDispatcher dispatcher = new PCEPDispatcherImpl(PCEPExtensionProviderContextImpl
+                               .getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(
+                               new HashedWheelTimer(), prefs, 5), new NioEventLoopGroup(), new NioEventLoopGroup());
+
                final InstanceIdentifier<Topology> topology = InstanceIdentifier.builder().node(Topology.class).toInstance();
 
                final PCEPTopologyProvider exp = new PCEPTopologyProvider(dispatcher, null, dps, topology);