handshake refactor 70/17770/1
authorMartin Bobak <mbobak@cisco.com>
Sat, 4 Apr 2015 19:35:34 +0000 (21:35 +0200)
committerMartin Bobak <mbobak@cisco.com>
Sat, 4 Apr 2015 20:27:11 +0000 (22:27 +0200)
 - handshake hijacks netty threads
 - only first hello message is sent using own thread
 - there is no blocking - if there is no response from
   device then handshake wont continue
 - tests adapted to hijack

Change-Id: I45798e280bba097abe6f4152797c348b97787a61
Signed-off-by: Martin Bobak <mbobak@cisco.com>
14 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/HandshakeContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/core/HandshakeListener.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/md/core/HandshakeManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/HandshakeContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/ConnectionReadyListenerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/OpenflowProtocolListenerInitialImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeStepWrapper.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java

index 8feeff6dea79e8caf12641b7f4ad495ebac95875..2ffb510c3892b972ce042db4e3af1af5e1093ad1 100644 (file)
@@ -8,13 +8,12 @@
 package org.opendaylight.openflowplugin.api.openflow.connection;
 
 import java.util.concurrent.ThreadPoolExecutor;
-
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
 
 /**
  * OF handshake context holder
  */
-public interface HandshakeContext {
+public interface HandshakeContext extends AutoCloseable {
 
     /**
      * @return handshakeManager
index d104b9cabb916ec18249ee79bd4cfb14307f2b8c..deb2efc34170a443963aa93c63311d21fd1bf1ed 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.openflowplugin.api.openflow.md.core;
 
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 
 /**
@@ -14,7 +15,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
  *
  */
 public interface HandshakeListener {
-    
+
     /**
      * @param featureOutput obtained
      * @param version negotiated
@@ -27,4 +28,8 @@ public interface HandshakeListener {
      */
     void onHandshakeFailure();
 
+    /**
+     * @param handshakeContext
+     */
+    void setHandshakeContext(HandshakeContext handshakeContext);
 }
index fd8a9eb9457932052f403eebe5b9b1d3cafa7328..5b7b70ac3606801a651ca442242428cfef659807 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.openflowplugin.api.openflow.md.core;
 
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
 
 /**
@@ -20,16 +19,6 @@ public interface HandshakeManager {
      */
     Short getVersion();
 
-    /**
-     * @return obtained connection features
-     */
-    GetFeaturesOutput getFeatures();
-
-    /**
-     * @param receivedHello from switch
-     */
-    void setReceivedHello(HelloMessage receivedHello);
-
     /**
      * @param errorHandler the errorHandler to set
      */
@@ -46,7 +35,8 @@ public interface HandshakeManager {
     void setUseVersionBitmap(boolean isBitmapNegotiationEnable);
 
     /**
+     * @param receivedHello message from device we need to act upon
      * process current handshake step
      */
-    void shake();
+    void shake(HelloMessage receivedHello);
 }
index 4ff951a456e4aed35c7b8f9e089aa473e8bed9ef..2bab423bed457fedb2e74513eb1d6505eba98a5d 100644 (file)
@@ -7,12 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.connection;
 
-import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandlerSimpleImpl;
-
 import java.net.InetAddress;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -27,6 +24,7 @@ import org.opendaylight.openflowplugin.impl.connection.listener.ConnectionReadyL
 import org.opendaylight.openflowplugin.impl.connection.listener.HandshakeListenerImpl;
 import org.opendaylight.openflowplugin.impl.connection.listener.OpenflowProtocolListenerInitialImpl;
 import org.opendaylight.openflowplugin.impl.connection.listener.SystemNotificationsListenerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandlerSimpleImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.HandshakeManagerImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
@@ -46,7 +44,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
     @Override
     public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
-        LOG.trace("preparing handshake");
+        LOG.trace("preparing handshake: {}", connectionAdapter.getRemoteAddress());
 
         final int handshakeThreadLimit = 1; //TODO: move to constants/parametrize
         final ThreadPoolLoggingExecutor handshakePool = createHandshakePool(
@@ -60,6 +58,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
         LOG.trace("prepare handshake context");
         HandshakeContext handshakeContext = new HandshakeContextImpl(handshakePool, handshakeManager);
+        handshakeListener.setHandshakeContext(handshakeContext);
 
         LOG.trace("prepare connection listeners");
         final ConnectionReadyListener connectionReadyListener = new ConnectionReadyListenerImpl(
@@ -73,7 +72,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
         final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext);
         connectionAdapter.setSystemListener(systemListener);
 
-        LOG.trace("connection balet finished");
+        LOG.trace("connection ballet finished");
     }
 
     /**
index adf3385b606929adada03babf7e152e4c2dce246..c6f501b417749bf4a3635f46f91e3f91f6b60253 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- * 
+ *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -8,15 +8,19 @@
 package org.opendaylight.openflowplugin.impl.connection;
 
 import java.util.concurrent.ThreadPoolExecutor;
-
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class HandshakeContextImpl implements HandshakeContext {
 
+    private static Logger LOG = LoggerFactory.getLogger(HandshakeContextImpl.class);
+
     private ThreadPoolExecutor handshakePool;
     private HandshakeManager handshakeManager;
 
@@ -39,4 +43,24 @@ public class HandshakeContextImpl implements HandshakeContext {
         return handshakePool;
     }
 
+    @Override
+    public void close() throws Exception {
+        shutdownPoolPolitely();
+    }
+
+    private void shutdownPoolPolitely() {
+        LOG.debug("terminating handshake pool");
+        handshakePool.shutdown();
+        try {
+            handshakePool.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
+        } finally {
+            handshakePool.purge();
+            if (! handshakePool.isTerminated()) {
+                handshakePool.shutdownNow();
+            }
+            LOG.debug("pool is terminated: {}", handshakePool.isTerminated());
+        }
+    }
 }
index d35d19d59d06e8b4d87d1039e757dde179eca92b..38dab4305a43e30bb61247b69dd01f5694807c38 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- * 
+ *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -18,9 +18,9 @@ import org.slf4j.LoggerFactory;
  * oneshot listener - once connection is ready, initiate handshake (if not already started by device)
  */
 public class ConnectionReadyListenerImpl implements ConnectionReadyListener {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionReadyListenerImpl.class);
-    
+
     private ConnectionContext connectionContext;
     private HandshakeContext handshakeContext;
 
@@ -36,7 +36,9 @@ public class ConnectionReadyListenerImpl implements ConnectionReadyListener {
 
     @Override
     public void onConnectionReady() {
-        LOG.debug("device is connected and ready-to-use (pipeline prepared)");
+        LOG.debug("device is connected and ready-to-use (pipeline prepared): {}",
+                connectionContext.getConnectionAdapter().getRemoteAddress());
+
         if (connectionContext.getConnectionState() == null) {
             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
                     null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter());
index 9a254a2ade57158d4ecc4a154f5e33e34285621b..fdb1c27f267663a20b4c9071a8bccb6fcb0c70ef 100644 (file)
@@ -7,11 +7,11 @@
  */
 package org.opendaylight.openflowplugin.impl.connection.listener;
 
-import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
-
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
+import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +25,7 @@ public class HandshakeListenerImpl implements HandshakeListener {
 
     private ConnectionContext connectionContext;
     private DeviceConnectedHandler deviceConnectedHandler;
+    private HandshakeContext handshakeContext;
 
     /**
      * @param connectionContext
@@ -47,6 +48,16 @@ public class HandshakeListenerImpl implements HandshakeListener {
     public void onHandshakeFailure() {
         LOG.info("handshake failed: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
         connectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
-        // TODO ensure that connection is closed
+        try {
+            handshakeContext.close();
+        } catch (Exception e) {
+            LOG.warn("Closing handshake context failed: {}", e.getMessage());
+            LOG.debug("Detail in hanshake context close:", e);
+        }
+    }
+
+    @Override
+    public void setHandshakeContext(HandshakeContext handshakeContext) {
+        this.handshakeContext = handshakeContext;
     }
 }
index c667793eb77f4f91c685bd0061703b827e099384..a69696a6734d0d06753aa2c8ed2055af1fefa231 100644 (file)
@@ -11,7 +11,16 @@ import com.google.common.base.Objects;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.openflow.md.core.HandshakeStepWrapper;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +81,9 @@ public class OpenflowProtocolListenerInitialImpl implements OpenflowProtocolList
         if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) {
             final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
                     hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter());
-            handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
+            //handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
+            // use up netty thread
+            handshakeStepWrapper.run();
         } else {
             //TODO: consider disconnecting of bad behaving device
         }
index e1d0df49ea8e51df9b97c9f7c7d9a4140a64433e..a7c8ee63aeeb1a7f408b64fa50f0c595e00aad50 100644 (file)
@@ -8,14 +8,16 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
@@ -65,8 +67,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.Futures;
-
 /**
  * @author mirehak
  */
@@ -111,6 +111,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     private int conductorId;
 
     private int ingressMaxQueueSize;
+    private HandshakeContext handshakeContext;
 
     /**
      * @param connectionAdapter
@@ -456,7 +457,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
-            Short negotiatedVersion) {
+                                       Short negotiatedVersion) {
         postHandshakeBasic(featureOutput, negotiatedVersion);
 
         // post-handshake actions
@@ -477,7 +478,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     /**
      * used by tests
-     * 
+     *
      * @param featureOutput
      * @param negotiatedVersion
      */
@@ -567,31 +568,29 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
     }
 
-    protected void shutdownPool() {
-        hsPool.shutdownNow();
-        LOG.debug("pool is terminated: {}", hsPool.isTerminated());
+    @Override
+    public void setId(int conductorId) {
+        this.conductorId = conductorId;
     }
 
-    protected void shutdownPoolPolitely() {
-        hsPool.shutdown();
+    @Override
+    public void close() {
+        conductorState = CONDUCTOR_STATE.RIP;
         try {
-            hsPool.awaitTermination(1, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
-            shutdownPool();
+            handshakeContext.close();
+        } catch (Exception e) {
+            LOG.warn("Closing handshake context failed: {}", e.getMessage());
+            LOG.debug("Detail in hanshake context close:", e);
         }
-        hsPool.purge();
-        LOG.debug("pool is terminated: {}", hsPool.isTerminated());
     }
 
     @Override
-    public void setId(int conductorId) {
-        this.conductorId = conductorId;
+    public void setHandshakeContext(HandshakeContext handshakeContext) {
+        this.handshakeContext = handshakeContext;
     }
 
-    @Override
-    public void close() {
-        shutdownPoolPolitely();
-        conductorState = CONDUCTOR_STATE.RIP;
+    @VisibleForTesting
+    ThreadPoolExecutor getHsPool() {
+        return hsPool;
     }
 }
index b108d336380b8429d28a512c53d9a7b19873220e..9d4e34b90bd28e5f16ed6766e01141f93c453bd3 100644 (file)
@@ -7,13 +7,16 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowplugin.ConnectionException;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
@@ -23,6 +26,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,20 +36,19 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HandshakeManagerImpl implements HandshakeManager {
-    
+
     private static final Logger LOG = LoggerFactory
             .getLogger(HandshakeManagerImpl.class);
-    
+
     private Short lastProposedVersion;
     private Short lastReceivedVersion;
     private final List<Short> versionOrder;
-    
-    private HelloMessage receivedHello;
+
+    //private HelloMessage receivedHello;
     private final ConnectionAdapter connectionAdapter;
-    private GetFeaturesOutput features;
     private Short version;
     private ErrorHandler errorHandler;
-    
+
     private long maxTimeout = 8000;
     private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
     private Short highestVersion;
@@ -55,31 +58,26 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private HandshakeListener handshakeListener;
 
     private boolean useVersionBitmap;
-    
-    @Override
-    public void setReceivedHello(HelloMessage receivedHello) {
-        this.receivedHello = receivedHello;
-    }
-    
+
     /**
-     * @param connectionAdapter 
-     * @param highestVersion 
+     * @param connectionAdapter
+     * @param highestVersion
      * @param versionOrder
      */
-    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, 
+    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
             List<Short> versionOrder) {
         this.highestVersion = highestVersion;
         this.versionOrder = versionOrder;
         this.connectionAdapter = connectionAdapter;
     }
-    
+
     @Override
     public void setHandshakeListener(HandshakeListener handshakeListener) {
         this.handshakeListener = handshakeListener;
     }
 
     @Override
-    public void shake() {
+    public synchronized void shake(HelloMessage receivedHello) {
 
         if (version != null) {
             // Some switches respond with a second HELLO acknowledging our HELLO
@@ -91,10 +89,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
         LOG.trace("handshake STARTED");
         setActiveXid(20L);
-        HelloMessage receivedHelloLoc = receivedHello;
-        
+
         try {
-            if (receivedHelloLoc == null) {
+            if (receivedHello == null) {
                 // first Hello sending
                 sendHelloMessage(highestVersion, getNextXid());
                 lastProposedVersion = highestVersion;
@@ -103,50 +100,71 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             // process the 2. and later hellos
-            Short remoteVersion = receivedHelloLoc.getVersion();
-            List<Elements> elements = receivedHelloLoc.getElements();
-            setActiveXid(receivedHelloLoc.getXid());
+            Short remoteVersion = receivedHello.getVersion();
+            List<Elements> elements = receivedHello.getElements();
+            setActiveXid(receivedHello.getXid());
             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
-            LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion, 
-                    remoteVersionBitmap, receivedHelloLoc.getXid());
-        
+            LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
+                    receivedHello.getXid(), remoteVersionBitmap);
+
             if (useVersionBitmap && remoteVersionBitmap != null) {
                 // versionBitmap on both sides -> ONE STEP DECISION
                 handleVersionBitmapNegotiation(elements);
-            } else { 
-                // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying 
+            } else {
+                // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
                 handleStepByStepVersionNegotiation(remoteVersion);
             }
         } catch (Exception ex) {
             errorHandler.handleException(ex, null);
-            connectionAdapter.disconnect();
+            LOG.trace("ret - shake fail - closing");
             handshakeListener.onHandshakeFailure();
-            LOG.trace("ret - shake fail: {}", ex.getMessage());
         }
     }
 
     /**
      * @param remoteVersion
-     * @throws Exception 
+     * @throws Exception
      */
-    private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Exception {
-        LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", 
+    private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
+        LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
                 remoteVersion, lastProposedVersion, highestVersion);
-        
+
         if (lastProposedVersion == null) {
-            // first hello has not been sent yet, send it and either wait for next remote 
+            // first hello has not been sent yet, send it and either wait for next remote
             // version or proceed
             lastProposedVersion = proposeNextVersion(remoteVersion);
-            sendHelloMessage(lastProposedVersion, getNextXid());
+            final Long nextHelloXid = getNextXid();
+            ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
+            Futures.addCallback(helloResult, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    try {
+                        stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+                    } catch (Exception e) {
+                        errorHandler.handleException(e, null);
+                        handshakeListener.onHandshakeFailure();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.info("hello sending seriously failed [{}]", nextHelloXid);
+                    LOG.trace("detail of hello send problem", t);
+                }
+            });
+        } else {
+            stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
         }
-        
+    }
+
+    private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
         if (remoteVersion == lastProposedVersion) {
             postHandshake(lastProposedVersion, getNextXid());
             LOG.trace("ret - OK - switch answered with lastProposedVersion");
         } else {
             checkNegotiationStalling(remoteVersion);
 
-            if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
+            if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
                 // wait for next version
                 LOG.trace("ret - wait");
             } else {
@@ -158,7 +176,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
     /**
      * @param remoteVersion
-     * @throws Exception 
+     * @throws Exception
      */
     private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
         Short proposedVersion;
@@ -167,7 +185,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
         lastProposedVersion = proposedVersion;
         sendHelloMessage(proposedVersion, getNextXid());
 
-        if (proposedVersion != remoteVersion) {
+        if (! Objects.equals(proposedVersion, remoteVersion)) {
             LOG.trace("ret - sent+wait");
         } else {
             LOG.trace("ret - sent+OK");
@@ -177,25 +195,39 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
     /**
      * @param elements
-     * @throws Exception 
+     * @throws Exception
      */
     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
-        Short proposedVersion;
-        proposedVersion = proposeCommonBitmapVersion(elements);
+        final Short proposedVersion = proposeCommonBitmapVersion(elements);
         if (lastProposedVersion == null) {
             // first hello has not been sent yet
-            sendHelloMessage(proposedVersion, getNextXid());
+            Long nexHelloXid = getNextXid();
+            ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
+            Futures.addCallback(helloDone, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.trace("ret - DONE - versionBitmap");
+                    postHandshake(proposedVersion, getNextXid());
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    // NOOP
+                }
+            });
+            LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
+        } else {
+            LOG.trace("ret - DONE - versionBitmap");
+            postHandshake(proposedVersion, getNextXid());
         }
-        postHandshake(proposedVersion, getNextXid());
-        LOG.trace("ret - OK - versionBitmap");
     }
-    
+
     /**
-     * 
+     *
      * @return
      */
     private Long getNextXid() {
-        activeXid += 1; 
+        activeXid += 1;
         return activeXid;
     }
 
@@ -205,7 +237,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private void setActiveXid(Long xid) {
         this.activeXid = xid;
     }
-    
+
     /**
      * @param remoteVersion
      */
@@ -216,11 +248,6 @@ public class HandshakeManagerImpl implements HandshakeManager {
         lastReceivedVersion = remoteVersion;
     }
 
-    @Override
-    public GetFeaturesOutput getFeatures() {
-        return features;
-    }
-    
     @Override
     public Short getVersion() {
         return version;
@@ -245,8 +272,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
                     }
                 }
             }
-            
+
             if(null == supportedHighestVersion) {
+                LOG.trace("versionBitmap: no common version found");
                 throw new IllegalArgumentException("no common version found in versionBitmap");
             }
         }
@@ -273,32 +301,57 @@ public class HandshakeManagerImpl implements HandshakeManager {
         }
         return proposal;
     }
-    
+
     /**
      * send hello reply without versionBitmap
      * @param helloVersion
      * @param helloXid
-     * @throws Exception 
+     * @throws Exception
      */
-    private void sendHelloMessage(Short helloVersion, Long helloXid) throws Exception {
+    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
         //Short highestVersion = ConnectionConductor.versionOrder.get(0);
         //final Long helloXid = 21L;
         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
-        
-        LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", 
+
+        final SettableFuture<Void> resultFtr = SettableFuture.create();
+
+        LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
                 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
-        
-        try {
-            RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit);
-            RpcUtil.smokeRpc(helloResult);
-            LOG.debug("FIRST HELLO sent.");
-        } catch (Exception e) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("FIRST HELLO sent.", e);
+
+        Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
+
+        ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
+        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+            @Override
+            public void onSuccess(RpcResult<Void> result) {
+                if (result.isSuccessful()) {
+                    LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
+                    resultFtr.set(null);
+                } else {
+                    for (RpcError error : result.getErrors()) {
+                        LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
+                                error.getInfo(), error.getSeverity(), error.getMessage(),
+                                connectionAdapter.getRemoteAddress());
+                        if (error.getCause() != null) {
+                            LOG.trace("DETAIL of sending hello failure", error.getCause());
+                        }
+                    }
+                    resultFtr.cancel(false);
+                    handshakeListener.onHandshakeFailure();
+                }
             }
-            handshakeListener.onHandshakeFailure();
-            throw new ConnectionException("FIRST HELLO sending failed because of connection issue.");
-        }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
+                        connectionAdapter.getRemoteAddress(), t.getMessage());
+                LOG.trace("DETAIL of sending of hello failure:", t);
+                resultFtr.cancel(false);
+                handshakeListener.onHandshakeFailure();
+            }
+        });
+        LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
+        return resultFtr;
     }
 
 
@@ -306,9 +359,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * after handshake set features, register to session
      * @param proposedVersion
      * @param xid
-     * @throws Exception 
+     * @throws Exception
      */
-    protected void postHandshake(Short proposedVersion, Long xid) throws Exception {
+    protected void postHandshake(final Short proposedVersion, final Long xid) {
         // set version
         version = proposedVersion;
 
@@ -319,44 +372,54 @@ public class HandshakeManagerImpl implements HandshakeManager {
         LOG.debug("sending feature request for version={} and xid={}", version, xid);
         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
                 .getFeatures(featuresBuilder.build());
-        LOG.debug("waiting for features");
-        try {
-            RpcResult<GetFeaturesOutput> rpcFeatures = 
-                    featuresFuture.get(maxTimeout, maxTimeoutUnit);
-            RpcUtil.smokeRpc(rpcFeatures);
-            
-            GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
-            
-            LOG.debug("obtained features: datapathId={}",
-                    featureOutput.getDatapathId());
-            LOG.debug("obtained features: auxiliaryId={}",
-                    featureOutput.getAuxiliaryId());
-            LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", 
-                    version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
-            
-            handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
-        } catch (TimeoutException e) {
-            // handshake failed
-            LOG.warn("issuing disconnect during handshake, reason: future expired", e);
-            connectionAdapter.disconnect();
-            handshakeListener.onHandshakeFailure();
-            throw e;
-        } catch (Exception e) {
-            // handshake failed
-            LOG.warn("issuing disconnect during handshake, reason - RPC: {}", e.getMessage(), e);
-            connectionAdapter.disconnect();
-            handshakeListener.onHandshakeFailure();
-            throw e;
-        }
-        
-        LOG.debug("postHandshake DONE");
+
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
+                new FutureCallback<RpcResult<GetFeaturesOutput>>() {
+                    @Override
+                    public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+                        LOG.trace("features are back");
+                        if (rpcFeatures.isSuccessful()) {
+                            GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+
+                            LOG.debug("obtained features: datapathId={}",
+                                    featureOutput.getDatapathId());
+                            LOG.debug("obtained features: auxiliaryId={}",
+                                    featureOutput.getAuxiliaryId());
+                            LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
+                                    version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
+                            handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
+                        } else {
+                            // handshake failed
+                            LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
+                            for (RpcError rpcError : rpcFeatures.getErrors()) {
+                                LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
+                                        rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
+                                        rpcError.getCause()
+                                );
+                            }
+                            handshakeListener.onHandshakeFailure();
+                        }
+
+                        LOG.debug("postHandshake DONE");
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
+                                connectionAdapter.getRemoteAddress(), t.getMessage());
+                        LOG.trace("DETAIL of sending of hello failure:", t);
+                    }
+                });
+
+        LOG.debug("future features [{}] hooked ..", xid);
+
     }
 
     @Override
     public void setUseVersionBitmap(boolean useVersionBitmap) {
         this.useVersionBitmap = useVersionBitmap;
     }
-    
+
     @Override
     public void setErrorHandler(ErrorHandler errorHandler) {
         this.errorHandler = errorHandler;
index 5d9bcb60e3eed25bce69636c1da820ba0424ffb4..35da23e9f17635e6cff62117ef5425427c48a501 100644 (file)
@@ -19,20 +19,20 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HandshakeStepWrapper implements Runnable {
-    
+
     private static final Logger LOG = LoggerFactory
             .getLogger(HandshakeStepWrapper.class);
-    
+
     private HelloMessage helloMessage;
     private HandshakeManager handshakeManager;
     private ConnectionAdapter connectionAdapter;
-    
-    
-    
+
+
+
     /**
      * @param helloMessage
      * @param handshakeManager
-     * @param connectionAdapter 
+     * @param connectionAdapter
      */
     public HandshakeStepWrapper(HelloMessage helloMessage,
             HandshakeManager handshakeManager, ConnectionAdapter connectionAdapter) {
@@ -44,8 +44,7 @@ public class HandshakeStepWrapper implements Runnable {
     @Override
     public void run() {
         if (connectionAdapter.isAlive()) {
-            handshakeManager.setReceivedHello(helloMessage);
-            handshakeManager.shake();
+            handshakeManager.shake(helloMessage);
         } else {
             LOG.debug("connection is down - skipping handshake step");
         }
index df377d137dd61a81b37637264378915d872b27d0..9c2ee4d41ce1b783cf06034d57662fa682bcf3b8 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.openflowplugin.openflow.md.core;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -30,17 +29,18 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
-import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeaturesV10;
@@ -56,6 +56,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
@@ -101,6 +102,8 @@ public class ConnectionConductorImplTest {
     private int expectedErrors = 0;
     @Mock
     private MessageSpy<DataContainer> messageSpy;
+    @Mock
+    HandshakeContext handshakeContext;
 
     public void incrExperimenterMessageCounter() {
         this.experimenterMessageCounter++;
@@ -162,6 +165,7 @@ public class ConnectionConductorImplTest {
         connectionConductor.setQueueProcessor(queueProcessor);
         connectionConductor.setErrorHandler(errorHandler);
         connectionConductor.init();
+        connectionConductor.setHandshakeContext(handshakeContext);
         eventPlan = new Stack<>();
         adapter.setEventPlan(eventPlan);
         adapter.setProceedTimeout(5000L);
@@ -189,7 +193,7 @@ public class ConnectionConductorImplTest {
             libSimulation.join();
         }
         queueProcessor.shutdown();
-        connectionConductor.shutdownPool();
+        connectionConductor.getHsPool().shutdown();
 
         for (Exception problem : adapter.getOccuredExceptions()) {
             LOG.error("during simulation on adapter side: "
@@ -322,7 +326,7 @@ public class ConnectionConductorImplTest {
                 EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
-                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+                (short) 0x01, getFeatureResponseMsg()));
 
         int i = 1;
         eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
@@ -360,7 +364,7 @@ public class ConnectionConductorImplTest {
                 EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
-                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+                (short) 0x01, getFeatureResponseMsg()));
 
         int i = 1;
         eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
@@ -497,7 +501,7 @@ public class ConnectionConductorImplTest {
      */
     private void executeNow() throws InterruptedException {
         execute(true);
-        connectionConductor.shutdownPool();
+        connectionConductor.getHsPool().shutdown();
     }
 
     /**
@@ -592,7 +596,7 @@ public class ConnectionConductorImplTest {
 
     /**
      * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage)}
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(ExperimenterMessage)}
      * .
      *
      * @throws InterruptedException
@@ -642,7 +646,7 @@ public class ConnectionConductorImplTest {
 
     /**
      * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage)}
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(PortStatus)}
      * <br><br>
      * Tests for getting features from port status message by port version
      * <ul>
@@ -667,13 +671,13 @@ public class ConnectionConductorImplTest {
         PortFeatures featuresMal = new PortFeatures(true, false, false, false, null, false, false, false, false, false, false, false, false, false, false, false);
         PortFeaturesV10 featuresV10 = new PortFeaturesV10(true, false, false, false, false, false, false, false, false, false, false, false);
 
-        //Malformed features           
+        //Malformed features
         builder.setVersion((short) 1).setPortNo(portNumber).setReason(PortReason.OFPPRADD).setCurrentFeatures(featuresMal);
         connectionConductor.processPortStatusMsg(builder.build());
         Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
         Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
 
-        //Version-features mismatch            
+        //Version-features mismatch
         builder.setCurrentFeatures(features);
         connectionConductor.processPortStatusMsg(builder.build());
         Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
index 3dbebfb7c007f450e2f5417ba40193b97e5516e0..deaa7aa002365073d7494091db7a97fab8fcb082 100644 (file)
@@ -7,9 +7,10 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,9 +40,6 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-
 /**
  * testing handshake
  */
@@ -70,7 +68,7 @@ public class HandshakeManagerImplTest {
      */
     @Before
     public void setUp() {
-        handshakeManager = new HandshakeManagerImpl(adapter, OFConstants.OFP_VERSION_1_3, 
+        handshakeManager = new HandshakeManagerImpl(adapter, OFConstants.OFP_VERSION_1_3,
                 ConnectionConductor.versionOrder);
         handshakeManager.setErrorHandler(errorHandler);
         handshakeManager.setHandshakeListener(handshakeListener);
@@ -161,10 +159,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(resultFeatures.getResult(), version);
     }
@@ -182,15 +179,14 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(resultFeatures.getResult(), version);
     }
 
     /**
      * Test of version negotiation Where switch version < 1.0
-     * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful 
+     * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
      * @throws Exception
      */
     @Test
@@ -199,8 +195,7 @@ public class HandshakeManagerImplTest {
         expectedErrors = 1;
         Short version = (short) 0x00;
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -208,7 +203,7 @@ public class HandshakeManagerImplTest {
 
     /**
      * Test of version negotiation Where switch version < 1.0
-     * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful 
+     * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
      * @throws Exception
      */
     @Test
@@ -217,10 +212,9 @@ public class HandshakeManagerImplTest {
         expectedErrors = 1;
         Short version = (short) 0x00;
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -240,11 +234,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), expVersion);
@@ -264,13 +256,11 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), expVersion);
@@ -289,8 +279,7 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -309,10 +298,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -332,11 +320,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), expVersion);
@@ -356,11 +342,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), expVersion);
@@ -377,11 +361,9 @@ public class HandshakeManagerImplTest {
         Short version = (short) 0x06;
         expectedErrors = 1;
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -398,13 +380,11 @@ public class HandshakeManagerImplTest {
         Short version = (short) 0x06;
         expectedErrors = 1;
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
-        handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
-        handshakeManager.shake();
+        handshakeManager.shake(createHelloMessage(version, helloXid).build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -427,8 +407,7 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -451,10 +430,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -477,8 +455,7 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -501,10 +478,9 @@ public class HandshakeManagerImplTest {
         Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
             .thenReturn(Futures.immediateFuture(resultFeatures));
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener).onHandshakeSuccessfull(
                 resultFeatures.getResult(), version);
@@ -525,8 +501,7 @@ public class HandshakeManagerImplTest {
         HelloMessageBuilder helloMessage = createHelloMessage(version, helloXid);
         addVersionBitmap(Lists.newArrayList((short) 0x05, (short) 0x02), helloMessage);
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -547,10 +522,9 @@ public class HandshakeManagerImplTest {
         HelloMessageBuilder helloMessage = createHelloMessage(version, helloXid);
         addVersionBitmap(Lists.newArrayList((short) 0x05, (short) 0x02), helloMessage);
 
-        handshakeManager.shake();
+        handshakeManager.shake(null);
 
-        handshakeManager.setReceivedHello(helloMessage.build());
-        handshakeManager.shake();
+        handshakeManager.shake(helloMessage.build());
 
         Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
                 Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
@@ -571,7 +545,7 @@ public class HandshakeManagerImplTest {
      * @param helloBuilder
      * @return
      */
-    private static HelloMessageBuilder addVersionBitmap(List<Short> versionOrder, 
+    private static HelloMessageBuilder addVersionBitmap(List<Short> versionOrder,
             HelloMessageBuilder helloBuilder) {
         short highestVersion = versionOrder.get(0);
         int elementsCount = highestVersion / Integer.SIZE;
index 9af46b4379ddceea8ec1d33fd8b6b23144b560bf..ec7fe6d079a4dba4e338336c8232787e0876c88e 100644 (file)
@@ -8,6 +8,11 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.plan;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -17,8 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
@@ -67,13 +73,10 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-
 /**
  * @author mirehak
  */
@@ -101,12 +104,14 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     private int planItemCounter;
 
     private boolean autoRead = true;
+    private final ExecutorService pool;
+
 
     /**
      * default ctor
      */
     public ConnectionAdapterStackImpl() {
-        // do nothing
+        pool = Executors.newSingleThreadExecutor();
     }
 
     @Override
@@ -127,21 +132,21 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     @Override
     public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
         checkRpcAndNext(arg0, "echoReply");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
         checkRpcAndNext(arg0, "experimenter");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
         checkRpcAndNext(arg0, "flowMod");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
@@ -180,35 +185,35 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     @Override
     public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
         checkRpcAndNext(arg0, "groupMod");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> hello(HelloInput arg0) {
         checkRpcAndNext(arg0, "helloReply");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
         checkRpcAndNext(arg0, "meterMod");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
         checkRpcAndNext(arg0, "packetOut");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> portMod(PortModInput arg0) {
         checkRpcAndNext(arg0, "portMod");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
@@ -223,21 +228,21 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     @Override
     public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
         checkRpcAndNext(arg0, "setAsync");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
         checkRpcAndNext(arg0, "setConfig");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
         checkRpcAndNext(arg0, "tableMod");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
@@ -360,7 +365,6 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @param rpcInput
      * @param rpcName
-     * @param msgTmp
      * @param switchTestWaitForRpc
      * @return
      */
@@ -527,10 +531,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     private synchronized void processRpcResponse(
             final SwitchTestRcpResponseEvent rpcResponse) {
         OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
-        LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+        LOG.debug("rpc-responding to OF_LISTENER: {}", rpcResponse.getXid());
 
         @SuppressWarnings("unchecked")
-        SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+        final SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
                 .get(rpcResponse.getXid());
 
         if (response != null) {
@@ -545,9 +549,10 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
                         ErrorType.RPC, new Exception(
                                 "rpc response failed (planned behavior)")));
             }
-            RpcResult<?> result = Rpcs.getRpcResult(successful,
+
+            final RpcResult<?> result = Rpcs.getRpcResult(successful,
                     plannedRpcResponseValue, errors);
-            response.set(result);
+            setFutureViaPool(response, result);
         } else {
             String msg = "RpcResponse not expected: xid="
                     + rpcResponse.getXid() + ", "
@@ -560,6 +565,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
     }
 
+    private void setFutureViaPool(final SettableFuture<RpcResult<?>> response, final RpcResult<?> result) {
+        pool.execute(new Runnable() {
+            @Override
+            public void run() {
+                response.set(result);
+            }
+        });
+    }
+
     /**
      * @param arg0
      *            rpc call content
@@ -575,11 +589,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     /**
      * @return rpc future result
      */
-    private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
-        SettableFuture<RpcResult<Void>> result = SettableFuture.create();
-        List<RpcError> errors = Collections.emptyList();
-        result.set(Rpcs.getRpcResult(true, (Void) null, errors));
-        return result;
+    private synchronized ListenableFuture<RpcResult<Void>> createOneWayRpcResult() {
+        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
     /**
@@ -619,14 +630,13 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     @Override
     public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
         checkRpcAndNext(arg0, "multipartRequestInput");
-        SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+        ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
         return result;
     }
 
     @Override
     public InetSocketAddress getRemoteAddress() {
-        // TODO Auto-generated method stub
-        return null;
+        return InetSocketAddress.createUnresolved("unittest-odl.example.org", 4242);
     }
 
     @Override