Added cachedThreadPool in ConnectionAdapter
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
index f91123b84ebc0cab02fb180e347e4bc9b45c8c8e..fe90483daf46ea60a22f1e20c7308e031ccb0286 100644 (file)
@@ -9,6 +9,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -89,6 +91,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
     private SystemNotificationsListener systemListener;
     private boolean disconnectOccured = false;
+    private ExecutorService threadPool;
     
     /**
      * default ctor 
@@ -98,9 +101,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(new ResponseRemovalListener()).build();
+        threadPool = Executors.newCachedThreadPool();
         LOG.info("ConnectionAdapter created");
-    
-    
     }
     
     /**
@@ -234,6 +236,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     
     @Override
     public void consume(final DataObject message) {
+        threadPool.execute(new Runnable() {
+            @Override
+            public void run() {
+                consumeIntern(message);
+            }
+        });
+    }
+
+    protected void consumeIntern(final DataObject message) {
         LOG.debug("Consume msg");
         if (disconnectOccured ) {
             return;
@@ -277,15 +288,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
                 if (rpcFuture != null) {
                     LOG.debug("corresponding rpcFuture found");
-                    new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            List<RpcError> errors = Collections.emptyList();
-                            LOG.debug("before setting rpcFuture");
-                            rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
-                            LOG.debug("after setting rpcFuture");
-                        }
-                    }).start();
+                    List<RpcError> errors = Collections.emptyList();
+                    LOG.debug("before setting rpcFuture");
+                    rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+                    LOG.debug("after setting rpcFuture");
                     responseCache.invalidate(key);
                 } else {
                     LOG.warn("received unexpected rpc response: "+key);