handshake refactor
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / HandshakeManagerImpl.java
index b108d336380b8429d28a512c53d9a7b19873220e..9d4e34b90bd28e5f16ed6766e01141f93c453bd3 100644 (file)
@@ -7,13 +7,16 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowplugin.ConnectionException;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
@@ -23,6 +26,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,20 +36,19 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HandshakeManagerImpl implements HandshakeManager {
-    
+
     private static final Logger LOG = LoggerFactory
             .getLogger(HandshakeManagerImpl.class);
-    
+
     private Short lastProposedVersion;
     private Short lastReceivedVersion;
     private final List<Short> versionOrder;
-    
-    private HelloMessage receivedHello;
+
+    //private HelloMessage receivedHello;
     private final ConnectionAdapter connectionAdapter;
-    private GetFeaturesOutput features;
     private Short version;
     private ErrorHandler errorHandler;
-    
+
     private long maxTimeout = 8000;
     private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
     private Short highestVersion;
@@ -55,31 +58,26 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private HandshakeListener handshakeListener;
 
     private boolean useVersionBitmap;
-    
-    @Override
-    public void setReceivedHello(HelloMessage receivedHello) {
-        this.receivedHello = receivedHello;
-    }
-    
+
     /**
-     * @param connectionAdapter 
-     * @param highestVersion 
+     * @param connectionAdapter
+     * @param highestVersion
      * @param versionOrder
      */
-    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, 
+    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
             List<Short> versionOrder) {
         this.highestVersion = highestVersion;
         this.versionOrder = versionOrder;
         this.connectionAdapter = connectionAdapter;
     }
-    
+
     @Override
     public void setHandshakeListener(HandshakeListener handshakeListener) {
         this.handshakeListener = handshakeListener;
     }
 
     @Override
-    public void shake() {
+    public synchronized void shake(HelloMessage receivedHello) {
 
         if (version != null) {
             // Some switches respond with a second HELLO acknowledging our HELLO
@@ -91,10 +89,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
         LOG.trace("handshake STARTED");
         setActiveXid(20L);
-        HelloMessage receivedHelloLoc = receivedHello;
-        
+
         try {
-            if (receivedHelloLoc == null) {
+            if (receivedHello == null) {
                 // first Hello sending
                 sendHelloMessage(highestVersion, getNextXid());
                 lastProposedVersion = highestVersion;
@@ -103,50 +100,71 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             // process the 2. and later hellos
-            Short remoteVersion = receivedHelloLoc.getVersion();
-            List<Elements> elements = receivedHelloLoc.getElements();
-            setActiveXid(receivedHelloLoc.getXid());
+            Short remoteVersion = receivedHello.getVersion();
+            List<Elements> elements = receivedHello.getElements();
+            setActiveXid(receivedHello.getXid());
             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
-            LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion, 
-                    remoteVersionBitmap, receivedHelloLoc.getXid());
-        
+            LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
+                    receivedHello.getXid(), remoteVersionBitmap);
+
             if (useVersionBitmap && remoteVersionBitmap != null) {
                 // versionBitmap on both sides -> ONE STEP DECISION
                 handleVersionBitmapNegotiation(elements);
-            } else { 
-                // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying 
+            } else {
+                // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
                 handleStepByStepVersionNegotiation(remoteVersion);
             }
         } catch (Exception ex) {
             errorHandler.handleException(ex, null);
-            connectionAdapter.disconnect();
+            LOG.trace("ret - shake fail - closing");
             handshakeListener.onHandshakeFailure();
-            LOG.trace("ret - shake fail: {}", ex.getMessage());
         }
     }
 
     /**
      * @param remoteVersion
-     * @throws Exception 
+     * @throws Exception
      */
-    private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Exception {
-        LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", 
+    private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
+        LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
                 remoteVersion, lastProposedVersion, highestVersion);
-        
+
         if (lastProposedVersion == null) {
-            // first hello has not been sent yet, send it and either wait for next remote 
+            // first hello has not been sent yet, send it and either wait for next remote
             // version or proceed
             lastProposedVersion = proposeNextVersion(remoteVersion);
-            sendHelloMessage(lastProposedVersion, getNextXid());
+            final Long nextHelloXid = getNextXid();
+            ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
+            Futures.addCallback(helloResult, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    try {
+                        stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+                    } catch (Exception e) {
+                        errorHandler.handleException(e, null);
+                        handshakeListener.onHandshakeFailure();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.info("hello sending seriously failed [{}]", nextHelloXid);
+                    LOG.trace("detail of hello send problem", t);
+                }
+            });
+        } else {
+            stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
         }
-        
+    }
+
+    private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
         if (remoteVersion == lastProposedVersion) {
             postHandshake(lastProposedVersion, getNextXid());
             LOG.trace("ret - OK - switch answered with lastProposedVersion");
         } else {
             checkNegotiationStalling(remoteVersion);
 
-            if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
+            if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
                 // wait for next version
                 LOG.trace("ret - wait");
             } else {
@@ -158,7 +176,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
     /**
      * @param remoteVersion
-     * @throws Exception 
+     * @throws Exception
      */
     private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
         Short proposedVersion;
@@ -167,7 +185,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
         lastProposedVersion = proposedVersion;
         sendHelloMessage(proposedVersion, getNextXid());
 
-        if (proposedVersion != remoteVersion) {
+        if (! Objects.equals(proposedVersion, remoteVersion)) {
             LOG.trace("ret - sent+wait");
         } else {
             LOG.trace("ret - sent+OK");
@@ -177,25 +195,39 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
     /**
      * @param elements
-     * @throws Exception 
+     * @throws Exception
      */
     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
-        Short proposedVersion;
-        proposedVersion = proposeCommonBitmapVersion(elements);
+        final Short proposedVersion = proposeCommonBitmapVersion(elements);
         if (lastProposedVersion == null) {
             // first hello has not been sent yet
-            sendHelloMessage(proposedVersion, getNextXid());
+            Long nexHelloXid = getNextXid();
+            ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
+            Futures.addCallback(helloDone, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.trace("ret - DONE - versionBitmap");
+                    postHandshake(proposedVersion, getNextXid());
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    // NOOP
+                }
+            });
+            LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
+        } else {
+            LOG.trace("ret - DONE - versionBitmap");
+            postHandshake(proposedVersion, getNextXid());
         }
-        postHandshake(proposedVersion, getNextXid());
-        LOG.trace("ret - OK - versionBitmap");
     }
-    
+
     /**
-     * 
+     *
      * @return
      */
     private Long getNextXid() {
-        activeXid += 1; 
+        activeXid += 1;
         return activeXid;
     }
 
@@ -205,7 +237,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private void setActiveXid(Long xid) {
         this.activeXid = xid;
     }
-    
+
     /**
      * @param remoteVersion
      */
@@ -216,11 +248,6 @@ public class HandshakeManagerImpl implements HandshakeManager {
         lastReceivedVersion = remoteVersion;
     }
 
-    @Override
-    public GetFeaturesOutput getFeatures() {
-        return features;
-    }
-    
     @Override
     public Short getVersion() {
         return version;
@@ -245,8 +272,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
                     }
                 }
             }
-            
+
             if(null == supportedHighestVersion) {
+                LOG.trace("versionBitmap: no common version found");
                 throw new IllegalArgumentException("no common version found in versionBitmap");
             }
         }
@@ -273,32 +301,57 @@ public class HandshakeManagerImpl implements HandshakeManager {
         }
         return proposal;
     }
-    
+
     /**
      * send hello reply without versionBitmap
      * @param helloVersion
      * @param helloXid
-     * @throws Exception 
+     * @throws Exception
      */
-    private void sendHelloMessage(Short helloVersion, Long helloXid) throws Exception {
+    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
         //Short highestVersion = ConnectionConductor.versionOrder.get(0);
         //final Long helloXid = 21L;
         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
-        
-        LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", 
+
+        final SettableFuture<Void> resultFtr = SettableFuture.create();
+
+        LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
                 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
-        
-        try {
-            RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit);
-            RpcUtil.smokeRpc(helloResult);
-            LOG.debug("FIRST HELLO sent.");
-        } catch (Exception e) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("FIRST HELLO sent.", e);
+
+        Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
+
+        ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
+        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+            @Override
+            public void onSuccess(RpcResult<Void> result) {
+                if (result.isSuccessful()) {
+                    LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
+                    resultFtr.set(null);
+                } else {
+                    for (RpcError error : result.getErrors()) {
+                        LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
+                                error.getInfo(), error.getSeverity(), error.getMessage(),
+                                connectionAdapter.getRemoteAddress());
+                        if (error.getCause() != null) {
+                            LOG.trace("DETAIL of sending hello failure", error.getCause());
+                        }
+                    }
+                    resultFtr.cancel(false);
+                    handshakeListener.onHandshakeFailure();
+                }
             }
-            handshakeListener.onHandshakeFailure();
-            throw new ConnectionException("FIRST HELLO sending failed because of connection issue.");
-        }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
+                        connectionAdapter.getRemoteAddress(), t.getMessage());
+                LOG.trace("DETAIL of sending of hello failure:", t);
+                resultFtr.cancel(false);
+                handshakeListener.onHandshakeFailure();
+            }
+        });
+        LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
+        return resultFtr;
     }
 
 
@@ -306,9 +359,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * after handshake set features, register to session
      * @param proposedVersion
      * @param xid
-     * @throws Exception 
+     * @throws Exception
      */
-    protected void postHandshake(Short proposedVersion, Long xid) throws Exception {
+    protected void postHandshake(final Short proposedVersion, final Long xid) {
         // set version
         version = proposedVersion;
 
@@ -319,44 +372,54 @@ public class HandshakeManagerImpl implements HandshakeManager {
         LOG.debug("sending feature request for version={} and xid={}", version, xid);
         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
                 .getFeatures(featuresBuilder.build());
-        LOG.debug("waiting for features");
-        try {
-            RpcResult<GetFeaturesOutput> rpcFeatures = 
-                    featuresFuture.get(maxTimeout, maxTimeoutUnit);
-            RpcUtil.smokeRpc(rpcFeatures);
-            
-            GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
-            
-            LOG.debug("obtained features: datapathId={}",
-                    featureOutput.getDatapathId());
-            LOG.debug("obtained features: auxiliaryId={}",
-                    featureOutput.getAuxiliaryId());
-            LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", 
-                    version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
-            
-            handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
-        } catch (TimeoutException e) {
-            // handshake failed
-            LOG.warn("issuing disconnect during handshake, reason: future expired", e);
-            connectionAdapter.disconnect();
-            handshakeListener.onHandshakeFailure();
-            throw e;
-        } catch (Exception e) {
-            // handshake failed
-            LOG.warn("issuing disconnect during handshake, reason - RPC: {}", e.getMessage(), e);
-            connectionAdapter.disconnect();
-            handshakeListener.onHandshakeFailure();
-            throw e;
-        }
-        
-        LOG.debug("postHandshake DONE");
+
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
+                new FutureCallback<RpcResult<GetFeaturesOutput>>() {
+                    @Override
+                    public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+                        LOG.trace("features are back");
+                        if (rpcFeatures.isSuccessful()) {
+                            GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+
+                            LOG.debug("obtained features: datapathId={}",
+                                    featureOutput.getDatapathId());
+                            LOG.debug("obtained features: auxiliaryId={}",
+                                    featureOutput.getAuxiliaryId());
+                            LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
+                                    version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
+                            handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
+                        } else {
+                            // handshake failed
+                            LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
+                            for (RpcError rpcError : rpcFeatures.getErrors()) {
+                                LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
+                                        rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
+                                        rpcError.getCause()
+                                );
+                            }
+                            handshakeListener.onHandshakeFailure();
+                        }
+
+                        LOG.debug("postHandshake DONE");
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
+                                connectionAdapter.getRemoteAddress(), t.getMessage());
+                        LOG.trace("DETAIL of sending of hello failure:", t);
+                    }
+                });
+
+        LOG.debug("future features [{}] hooked ..", xid);
+
     }
 
     @Override
     public void setUseVersionBitmap(boolean useVersionBitmap) {
         this.useVersionBitmap = useVersionBitmap;
     }
-    
+
     @Override
     public void setErrorHandler(ErrorHandler errorHandler) {
         this.errorHandler = errorHandler;