Bug 3356 - Configurable ChannelOutboundQueue size 40/21040/2
authorMichal Polkorab <michal.polkorab@pantheon.sk>
Fri, 22 May 2015 14:30:43 +0000 (16:30 +0200)
committerMichal Polkorab <michal.polkorab@pantheon.sk>
Mon, 25 May 2015 11:25:45 +0000 (13:25 +0200)
Change-Id: I1f623cc89e3f9efc6bc6560cf19b4967d1fed732
Signed-off-by: Michal Polkorab <michal.polkorab@pantheon.sk>
13 files changed:
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionConfiguration.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactory.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterFactoryImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SwitchConnectionProviderImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ChannelInitializerFactory.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFDatagramPacketHandler.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpChannelInitializer.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpChannelInitializer.java
openflow-protocol-impl/src/main/java/org/opendaylight/yang/gen/v1/urn/opendaylight/params/xml/ns/yang/openflow/_switch/connection/provider/impl/rev140328/SwitchConnectionProviderModule.java
openflow-protocol-impl/src/main/yang/openflow-switch-connection-provider-impl.yang
openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializerTest.java
openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/ConnectionConfigurationImpl.java

index 8b3447694391cddca13c10020bd90cc23dc69982..d923c84e21d9e4892b06b6c092f378af0834f991 100644 (file)
@@ -55,4 +55,9 @@ public interface ConnectionConfiguration {
      * @return thread numbers for TcpHandler's eventloopGroups
      */
     public ThreadConfiguration getThreadConfiguration();
+
+    /**
+     * @return size of ChannelOutbounfQueue
+     */
+    public int getOutboundQueueSize();
 }
\ No newline at end of file
index dba298b7dba917c924ece38674809dc11f1be85a..de97b75764ac9705728b0db9007cfef315260952 100644 (file)
@@ -21,8 +21,9 @@ public interface ConnectionAdapterFactory {
 
     /**
      * @param ch
+     * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}
      * @return connection adapter tcp-implementation
      */
-    public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address;
+    public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address, int outboundQueueSize);
 
 }
index 259dd6f571bca5905289da303ce84cc2f54f7626..9cee28f005bfe8496643ee4fa59aed3acfe2b875 100644 (file)
@@ -24,8 +24,8 @@ public class ConnectionAdapterFactoryImpl implements ConnectionAdapterFactory {
      * @return connection adapter tcp-implementation
      */
        @Override
-    public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address) {
-        return new ConnectionAdapterImpl(ch, address);
+    public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address, int outboundQueueSize) {
+        return new ConnectionAdapterImpl(ch, address, outboundQueueSize);
     }
 
 }
index 11e8336033e65cd78ee4daae3c04d91dfaa7d1d7..d93428b40a779e1a60f27fe4907e76d1992767b7 100644 (file)
@@ -86,12 +86,6 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     /** after this time, RPC future response objects will be thrown away (in minutes) */
     public static final int RPC_RESPONSE_EXPIRATION = 1;
 
-    /**
-     * Default depth of write queue, e.g. we allow these many messages
-     * to be queued up before blocking producers.
-     */
-    public static final int DEFAULT_QUEUE_DEPTH = 1024;
-
     private static final Logger LOG = LoggerFactory
             .getLogger(ConnectionAdapterImpl.class);
     private static final Exception QUEUE_FULL_EXCEPTION =
@@ -127,14 +121,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * @param channel the channel to be set - used for communication
      * @param address client address (used only in case of UDP communication,
      *  as there is no need to store address over tcp (stable channel))
+     * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}
      */
-    public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) {
+    public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, int outboundQueueSize) {
         responseCache = CacheBuilder.newBuilder()
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(REMOVAL_LISTENER).build();
         this.channel = Preconditions.checkNotNull(channel);
-        this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH);
+        this.output = new ChannelOutboundQueue(channel, outboundQueueSize);
         output.setAddress(address);
         channel.pipeline().addLast(output);
         LOG.debug("ConnectionAdapter created");
index c2072ba51535c387c4397370192c5c1418ffc30a..aafd386f83a47cc7da3873e355c67e7315f01371 100644 (file)
@@ -141,6 +141,7 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider {
         factory.setTlsConfig(connConfig.getTlsConfiguration());
         factory.setSerializationFactory(serializationFactory);
         factory.setDeserializationFactory(deserializationFactory);
+        factory.setOutboundQueueSize(connConfig.getOutboundQueueSize());
         TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
         if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) {
             server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
index c453b0b092244ed4dc54336457126303bb2baab1..cdf2d302b003a07ba4c5784e109e6f6564c2a37b 100644 (file)
@@ -24,6 +24,7 @@ public class ChannelInitializerFactory {
     private SerializationFactory serializationFactory;\r
     private TlsConfiguration tlsConfig;\r
     private SwitchConnectionHandler switchConnectionHandler;\r
+    private int outboundQueueSize;\r
     \r
     /**\r
      * @return PublishingChannelInitializer that initializes new channels\r
@@ -35,6 +36,7 @@ public class ChannelInitializerFactory {
         initializer.setSerializationFactory(serializationFactory);\r
         initializer.setTlsConfiguration(tlsConfig);\r
         initializer.setSwitchConnectionHandler(switchConnectionHandler);\r
+        initializer.setOutboungQueueSize(outboundQueueSize);\r
         return initializer;\r
     }\r
 \r
@@ -47,6 +49,7 @@ public class ChannelInitializerFactory {
         initializer.setDeserializationFactory(deserializationFactory);\r
         initializer.setSerializationFactory(serializationFactory);\r
         initializer.setSwitchConnectionHandler(switchConnectionHandler);\r
+        initializer.setOutboungQueueSize(outboundQueueSize);\r
         return initializer;\r
     }\r
 \r
@@ -84,4 +87,11 @@ public class ChannelInitializerFactory {
     public void setSwitchConnectionHandler(SwitchConnectionHandler switchConnectionHandler) {\r
         this.switchConnectionHandler = switchConnectionHandler;\r
     }\r
+\r
+    /**\r
+     * @param outboundQueueSize\r
+     */\r
+    public void setOutboundQueueSize(int outboundQueueSize) {\r
+        this.outboundQueueSize = outboundQueueSize;\r
+    }\r
 }
\ No newline at end of file
index 0f1b8236cc52be04f76cb9135f710cead105a886..65146ad2d714d0db3f4591f8c5e650e78917cdc1 100644 (file)
@@ -38,14 +38,17 @@ public class OFDatagramPacketHandler extends MessageToMessageDecoder<DatagramPac
     private static final byte LENGTH_INDEX_IN_HEADER = 2;\r
     private ConnectionAdapterFactory adapterFactory = new ConnectionAdapterFactoryImpl();\r
     private SwitchConnectionHandler connectionHandler;\r
+    private int outboundQueueSize;\r
 \r
     /**\r
      * Default constructor\r
      * @param sch the switchConnectionHandler that decides\r
      * what to do with incomming message / channel\r
+     * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}\r
      */\r
-    public OFDatagramPacketHandler(SwitchConnectionHandler sch) {\r
+    public OFDatagramPacketHandler(SwitchConnectionHandler sch, int outboundQueueSize) {\r
         this.connectionHandler = sch;\r
+        this.outboundQueueSize = outboundQueueSize;\r
     }\r
 \r
     @Override\r
@@ -62,7 +65,7 @@ public class OFDatagramPacketHandler extends MessageToMessageDecoder<DatagramPac
         MessageConsumer consumer = UdpConnectionMap.getMessageConsumer(msg.sender());\r
         if (consumer == null) {\r
             ConnectionFacade connectionFacade =\r
-                    adapterFactory.createConnectionFacade(ctx.channel(), msg.sender());\r
+                    adapterFactory.createConnectionFacade(ctx.channel(), msg.sender(), outboundQueueSize);\r
             connectionHandler.onSwitchConnected(connectionFacade);\r
             connectionFacade.checkListeners();\r
             UdpConnectionMap.addConnection(msg.sender(), connectionFacade);\r
index c1a9c43c702be0e0ac0b6232fd5cfc21c124331f..87f29ea4a6d986eaa10b13327f9cdffc1c4495f1 100644 (file)
@@ -35,6 +35,7 @@ public class TcpChannelInitializer extends ProtocolChannelInitializer<SocketChan
             .getLogger(TcpChannelInitializer.class);
     private final DefaultChannelGroup allChannels;
     private ConnectionAdapterFactory connectionAdapterFactory;
+    private int outboundQueueSize;
 
     /**
      * default ctor
@@ -67,7 +68,7 @@ public class TcpChannelInitializer extends ProtocolChannelInitializer<SocketChan
         LOGGER.info("Incoming connection accepted - building pipeline");
         allChannels.add(ch);
         ConnectionFacade connectionFacade = null;
-        connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null);
+        connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, outboundQueueSize);
         try {
             LOGGER.debug("calling plugin: " + getSwitchConnectionHandler());
             getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
@@ -116,4 +117,11 @@ public class TcpChannelInitializer extends ProtocolChannelInitializer<SocketChan
     public int size() {
         return allChannels.size();
     }
+
+    /**
+     * @param outboundQueueSize
+     */
+    public void setOutboungQueueSize(int outboundQueueSize) {
+        this.outboundQueueSize = outboundQueueSize;
+    }
 }
\ No newline at end of file
index 518714403c2f3cb2d38212e05f4fb3314069c403..dab86c8e6bc56067df2cb42a3dd270a7e1cba766 100644 (file)
@@ -16,10 +16,12 @@ import io.netty.channel.socket.nio.NioDatagramChannel;
  */\r
 public class UdpChannelInitializer extends ProtocolChannelInitializer<NioDatagramChannel> {\r
 \r
+    private int outboundQueueSize;\r
+\r
     @Override\r
     protected void initChannel(NioDatagramChannel ch) throws Exception {\r
         ch.pipeline().addLast(PipelineHandlers.OF_DATAGRAMPACKET_HANDLER.name(),\r
-                new OFDatagramPacketHandler(getSwitchConnectionHandler()));\r
+                new OFDatagramPacketHandler(getSwitchConnectionHandler(), outboundQueueSize));\r
         OFDatagramPacketDecoder ofDatagramPacketDecoder = new OFDatagramPacketDecoder();\r
         ofDatagramPacketDecoder.setDeserializationFactory(getDeserializationFactory());\r
         ch.pipeline().addLast(PipelineHandlers.OF_DATAGRAMPACKET_DECODER.name(),\r
@@ -29,4 +31,11 @@ public class UdpChannelInitializer extends ProtocolChannelInitializer<NioDatagra
         ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofDatagramPacketEncoder);\r
 //        connectionFacade.fireConnectionReadyNotification();\r
     }\r
+\r
+    /**\r
+     * @param outboundQueueSize\r
+     */\r
+    public void setOutboungQueueSize(int outboundQueueSize) {\r
+        this.outboundQueueSize = outboundQueueSize;\r
+    }\r
 }
\ No newline at end of file
index b3cbeb29a936a701535ceb60f5bac6c15904a209..a02fd88c130b2d484994e934c753e18c91750730 100644 (file)
@@ -81,6 +81,7 @@ public final class SwitchConnectionProviderModule extends org.opendaylight.yang.
         final Tls tlsConfig = getTls();
         final Threads threads = getThreads();
         final TransportProtocol transportProtocol = getTransportProtocol();
+        final int queueSize = getOutboundQueueSize();
         
         return new ConnectionConfiguration() {
             @Override
@@ -166,6 +167,10 @@ public final class SwitchConnectionProviderModule extends org.opendaylight.yang.
                     }
                 };
             }
+            @Override
+            public int getOutboundQueueSize() {
+                return queueSize;
+            }
         };
     }
 
index 6afd5c60a1b8aeea650e75b83c6fcee391f6267e..923a59b0fee1e95d8a2ddda79b684e0c20c799c2 100644 (file)
@@ -92,6 +92,11 @@ module openflow-switch-connection-provider-impl {
                     type uint16;
                 }
             }
+            leaf outbound-queue-size {
+                description "Sets size of ChannelOutboundQueue";
+                type uint16;
+                default 1024;
+            }
         }
     }
 }
\ No newline at end of file
index 2a503dd259f434e42157fc01e31e6f5af3647c5b..68386fbb79d15caa789b0771b08f3abc66f56060 100644 (file)
@@ -48,6 +48,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow
  */
 public class PublishingChannelInitializerTest {
 
+    private static final int OUTBOUND_QUEUE_SIZE = 1024;
     @Mock SocketChannel mockSocketCh ;
     @Mock ChannelPipeline mockChPipeline ;
     @Mock SwitchConnectionHandler mockSwConnHandler ;
@@ -75,14 +76,15 @@ public class PublishingChannelInitializerTest {
         pubChInitializer.setSerializationFactory(mockSerializationFactory);
         pubChInitializer.setDeserializationFactory(mockDeserializationFactory);
         pubChInitializer.setSwitchIdleTimeout(1) ;
-        pubChInitializer.getConnectionIterator() ;
+        pubChInitializer.getConnectionIterator();
+        pubChInitializer.setOutboungQueueSize(OUTBOUND_QUEUE_SIZE);
 
         when( mockChGrp.size()).thenReturn(1) ;
         pubChInitializer.setSwitchConnectionHandler( mockSwConnHandler ) ;
 
         inetSockAddr = new InetSocketAddress(InetAddress.getLocalHost(), 8675 ) ;
 
-        when(mockConnAdaptorFactory.createConnectionFacade(mockSocketCh, null))
+        when(mockConnAdaptorFactory.createConnectionFacade(mockSocketCh, null, OUTBOUND_QUEUE_SIZE))
         .thenReturn(mockConnFacade);
         when(mockSocketCh.remoteAddress()).thenReturn(inetSockAddr) ;
         when(mockSocketCh.localAddress()).thenReturn(inetSockAddr) ;
index 47c1889bb51c0e233fbea9ff2591114b3fbdd87e..652c9e947ab0c0f3e3b76a13a170872e4aaa4379 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.T
  */
 public class ConnectionConfigurationImpl implements ConnectionConfiguration {
 
+    private static final int OUTBOUND_QUEUE_SIZE = 1024;
     private InetAddress address;
     private int port;
     private Object transferProtocol;
@@ -93,4 +94,9 @@ public class ConnectionConfigurationImpl implements ConnectionConfiguration {
     public void setThreadConfiguration(ThreadConfiguration threadConfig) {
         this.threadConfig = threadConfig;
     }
+
+    @Override
+    public int getOutboundQueueSize() {
+        return OUTBOUND_QUEUE_SIZE;
+    }
 }
\ No newline at end of file