Added cachedThreadPool in ConnectionAdapter 75/2075/2
authorMichal Polkorab <michal.polkorab@pantheon.sk>
Tue, 22 Oct 2013 13:46:55 +0000 (15:46 +0200)
committerMichal Polkorab <michal.polkorab@pantheon.sk>
Wed, 23 Oct 2013 11:55:07 +0000 (13:55 +0200)
Signed-off-by: Michal Polkorab <michal.polkorab@pantheon.sk>
Change-Id: Ifb0e0f91e77302d9c2b61600e536a028759654eb

openflow-protocol-api/src/main/yang/openflow-action.yang
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/DelegatingInboundHandler.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/deserialization/factories/MultipartReplyMessageFactory.java
openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/impl/integration/MockPlugin.java
openflow-protocol-it/src/test/resources/log4j.xml
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/ScenarioHandler.java
simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/WaitForMessageEvent.java

index 43157f2b2aa4a4a30596a8a0b4d4006b60829cef..6b10162209e594d67fa7ebe7343efc3c7691f0ac 100644 (file)
@@ -76,11 +76,11 @@ module openflow-action {
         description "";
         base oft:action;
     }
-    
+
     container actions-container {
         uses action-header;
     }
-    
+
     grouping action-header {
         container action {
             leaf type {
@@ -90,5 +90,5 @@ module openflow-action {
             }
         }
     }
-    
+
 }
\ No newline at end of file
index f91123b84ebc0cab02fb180e347e4bc9b45c8c8e..fe90483daf46ea60a22f1e20c7308e031ccb0286 100644 (file)
@@ -9,6 +9,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -89,6 +91,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
     private SystemNotificationsListener systemListener;
     private boolean disconnectOccured = false;
+    private ExecutorService threadPool;
     
     /**
      * default ctor 
@@ -98,9 +101,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(new ResponseRemovalListener()).build();
+        threadPool = Executors.newCachedThreadPool();
         LOG.info("ConnectionAdapter created");
-    
-    
     }
     
     /**
@@ -234,6 +236,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     
     @Override
     public void consume(final DataObject message) {
+        threadPool.execute(new Runnable() {
+            @Override
+            public void run() {
+                consumeIntern(message);
+            }
+        });
+    }
+
+    protected void consumeIntern(final DataObject message) {
         LOG.debug("Consume msg");
         if (disconnectOccured ) {
             return;
@@ -277,15 +288,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
                 if (rpcFuture != null) {
                     LOG.debug("corresponding rpcFuture found");
-                    new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            List<RpcError> errors = Collections.emptyList();
-                            LOG.debug("before setting rpcFuture");
-                            rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
-                            LOG.debug("after setting rpcFuture");
-                        }
-                    }).start();
+                    List<RpcError> errors = Collections.emptyList();
+                    LOG.debug("before setting rpcFuture");
+                    rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+                    LOG.debug("after setting rpcFuture");
                     responseCache.invalidate(key);
                 } else {
                     LOG.warn("received unexpected rpc response: "+key);
index 44366ee32dedb03e59d2c347bfbe7585d9f6e193..9ef3b73e8be8b16d668c278b8f06a621e86c486b 100644 (file)
@@ -34,12 +34,7 @@ public class DelegatingInboundHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, final Object msg)\r
             throws Exception {\r
         LOGGER.debug("Reading");\r
-        new Thread(new Runnable() {\r
-            @Override\r
-            public void run() {\r
-                consumer.consume((DataObject) msg);\r
-            }\r
-        }).start();\r
+        consumer.consume((DataObject) msg);\r
     }\r
     \r
     @Override\r
index c8bfe6e011d20c77d9b775631efb63986049c421..ffa36485441eb2da8bf87ca49e681842d5304c1a 100644 (file)
@@ -103,7 +103,7 @@ public class MultipartReplyMessageFactory implements OFDeserializer<MultipartRep
         rawMessage.skipBytes(PADDING_IN_MULTIPART_REPLY_HEADER);\r
         // TODO - implement body\r
         //mrmb.setBody(rawMessage.readBytes(rawMessage.readableBytes()).array());\r
-        \r
+\r
         switch (builder.getType().getIntValue()) {\r
         case 0:  builder.setMultipartReplyBody(setDesc(rawMessage));\r
                  break;\r
index 40419621e81827e7d56d68ad23ffc2d295180dfd..3ba78a42be5baac6ae612ee057a444cc3cf11b1c 100644 (file)
@@ -141,11 +141,11 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan
         try {\r
             LOGGER.info("mockPlugin: "+System.identityHashCode(this));\r
             Thread.sleep(500);\r
-//            if (adapter != null) {\r
+            if (adapter != null) {\r
                 Future<Boolean> disconnect = adapter.disconnect();\r
                 disconnect.get();\r
                 LOGGER.info("Disconnected");\r
-//            }\r
+            }\r
         } catch (Exception e) {\r
             LOGGER.error(e.getMessage(), e);\r
         }\r
index 23b1b663af6ea90cec0bde764656fb859c5ccbaf..c06081f05e6323f142db13c4dfd4a8676ce0a526 100644 (file)
         <level value="DEBUG" />\r
         <appender-ref ref="console" />\r
     </logger>\r
-    <logger name="org.opendaylight.openflowjava.protocol.impl.clients" additivity="false">\r
-        <level value="DEBUG" />\r
-        <appender-ref ref="console" />\r
-    </logger>\r
 \r
     <root>\r
         <priority value="INFO" />\r
index 9e8a8025d299399f248c79d49b92979d48d2d90d..45c1ef99b9cce2032c0eec2f10229c7484e2d55e 100644 (file)
@@ -65,7 +65,7 @@ public class ScenarioHandler extends Thread {
         }
         LOGGER.info("Scenario finished");
         synchronized (this) {
-            notify();
+            this.notify();
         }
     }
 
index 58529dc10a0ec839d8cf896b1d0da69b6e964edb..d9a38b025f27578ae568930b74dcc066a07b4b6a 100644 (file)
@@ -29,7 +29,7 @@ public class WaitForMessageEvent implements ClientEvent {
             LOGGER.debug("received msg: " + ByteBufUtils.bytesToHexString(headerReceived));
             return false;
         }
-        LOGGER.info("Waitformessageevent - headers are same");
+        LOGGER.info("Headers OK");
         return true;
     }