Cleanup FlowMessageSerializer
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
index 12ac77ff6f96b054459feef67595f2dcd85444d6..36fe52daf8aaf5da8ebeed579494e7d31fce3569 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,27 +7,35 @@
  */
 package org.opendaylight.openflowplugin.impl.connection;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.Future;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint64;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,18 +49,21 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private Short lastReceivedVersion;
     private final List<Short> versionOrder;
 
-
     private final ConnectionAdapter connectionAdapter;
     private Short version;
-    private ErrorHandler errorHandler;
+    private final ErrorHandler errorHandler;
 
-    private Short highestVersion;
+    private final Short highestVersion;
 
     private Long activeXid;
 
-    private HandshakeListener handshakeListener;
+    private final HandshakeListener handshakeListener;
+
+    private boolean useVersionBitmap; // not final just for unit test
 
-    private boolean useVersionBitmap;
+    private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+    private final int deviceConnectionHoldTime;
+    private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
 
     /**
      * Constructor.
@@ -60,21 +71,33 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param connectionAdapter connection adaptor for switch
      * @param highestVersion    highest openflow version
      * @param versionOrder      list of version in order for connection protocol negotiation
+     * @param errorHandler      the ErrorHandler
+     * @param handshakeListener the HandshakeListener
+     * @param useVersionBitmap  should use negotiation bit map
+     * @param deviceConnectionRateLimiter  device connection rate limiter utility
+     * @param deviceConnectionHoldTime  deivce connection hold time in seconds
+     * @param deviceConnectionStatusProvider  utility for maintaining device connection states
      */
-    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder) {
+    public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
+                                final List<Short> versionOrder, final ErrorHandler errorHandler,
+                                final HandshakeListener handshakeListener, final boolean useVersionBitmap,
+                                final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
+                                final int deviceConnectionHoldTime,
+                                final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
         this.highestVersion = highestVersion;
         this.versionOrder = versionOrder;
         this.connectionAdapter = connectionAdapter;
-    }
-
-    @Override
-    public void setHandshakeListener(HandshakeListener handshakeListener) {
+        this.errorHandler = errorHandler;
         this.handshakeListener = handshakeListener;
+        this.useVersionBitmap = useVersionBitmap;
+        this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
+        this.deviceConnectionHoldTime = deviceConnectionHoldTime;
+        this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public synchronized void shake(HelloMessage receivedHello) {
+    public synchronized void shake(final HelloMessage receivedHello) {
 
         if (version != null) {
             // Some switches respond with a second HELLO acknowledging our HELLO
@@ -97,9 +120,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             // process the 2. and later hellos
-            Short remoteVersion = receivedHello.getVersion();
+            Uint8 remoteVersion = receivedHello.getVersion();
             List<Elements> elements = receivedHello.getElements();
-            setActiveXid(receivedHello.getXid());
+            setActiveXid(receivedHello.getXid().toJava());
             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
                       remoteVersionBitmap);
@@ -109,7 +132,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 handleVersionBitmapNegotiation(elements);
             } else {
                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
-                handleStepByStepVersionNegotiation(remoteVersion);
+                handleStepByStepVersionNegotiation(remoteVersion.toJava());
             }
         } catch (Exception ex) {
             errorHandler.handleException(ex);
@@ -137,9 +160,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
             Futures.addCallback(helloResult, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
+                public void onSuccess(final Void result) {
                     try {
-                        stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+                        stepByStepVersionSubStep(remoteVersion);
                     } catch (Exception e) {
                         errorHandler.handleException(e);
                         handshakeListener.onHandshakeFailure();
@@ -147,24 +170,24 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 }
 
                 @Override
-                public void onFailure(Throwable throwable) {
+                public void onFailure(final Throwable throwable) {
                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
                     LOG.trace("detail of hello send problem", throwable);
                 }
-            });
+            }, MoreExecutors.directExecutor());
         } else {
-            stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+            stepByStepVersionSubStep(remoteVersion);
         }
     }
 
-    private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
-        if (remoteVersion.equals(lastProposedVersion)) {
+    private void stepByStepVersionSubStep(final Short remoteVersion) {
+        if (remoteVersion >= lastProposedVersion) {
             postHandshake(lastProposedVersion, getNextXid());
             LOG.trace("ret - OK - switch answered with lastProposedVersion");
         } else {
             checkNegotiationStalling(remoteVersion);
 
-            if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
+            if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
                 // wait for next version
                 LOG.trace("ret - wait");
             } else {
@@ -180,7 +203,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param remoteVersion remote version
      * @throws Exception exception
      */
-    private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
+    private void handleLowerVersionProposal(final Short remoteVersion) {
         Short proposedVersion;
         // find the version from header version field
         proposedVersion = proposeNextVersion(remoteVersion);
@@ -201,7 +224,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param elements version elements
      * @throws Exception exception
      */
-    private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
+    private void handleVersionBitmapNegotiation(final List<Elements> elements) {
         final Short proposedVersion = proposeCommonBitmapVersion(elements);
         if (lastProposedVersion == null) {
             // first hello has not been sent yet
@@ -209,16 +232,16 @@ public class HandshakeManagerImpl implements HandshakeManager {
             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
             Futures.addCallback(helloDone, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
+                public void onSuccess(final Void result) {
                     LOG.trace("ret - DONE - versionBitmap");
                     postHandshake(proposedVersion, getNextXid());
                 }
 
                 @Override
-                public void onFailure(Throwable throwable) {
+                public void onFailure(final Throwable throwable) {
                     // NOOP
                 }
-            });
+            }, MoreExecutors.directExecutor());
             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
         } else {
             LOG.trace("ret - DONE - versionBitmap");
@@ -231,7 +254,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
         return activeXid;
     }
 
-    private void setActiveXid(Long xid) {
+    private void setActiveXid(final Long xid) {
         this.activeXid = xid;
     }
 
@@ -240,7 +263,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      *
      * @param remoteVersion remove version
      */
-    private void checkNegotiationStalling(Short remoteVersion) {
+    private void checkNegotiationStalling(final Short remoteVersion) {
         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
         }
@@ -258,9 +281,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param list bitmap list
      * @return proposed bitmap value
      */
-    protected Short proposeCommonBitmapVersion(List<Elements> list) {
+    protected Short proposeCommonBitmapVersion(final List<Elements> list) {
         Short supportedHighestVersion = null;
-        if ((null != list) && (0 != list.size())) {
+        if (null != list && 0 != list.size()) {
             for (Elements element : list) {
                 List<Boolean> bitmap = element.getVersionBitmap();
                 // check for version bitmap
@@ -288,7 +311,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param remoteVersion openflow version supported by remote entity
      * @return openflow version
      */
-    protected short proposeNextVersion(short remoteVersion) {
+    protected short proposeNextVersion(final short remoteVersion) {
         Short proposal = null;
         for (short offer : versionOrder) {
             if (offer <= remoteVersion) {
@@ -309,7 +332,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param helloVersion initial hello version for openflow connection negotiation
      * @param helloXid     transaction id
      */
-    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
+    private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
 
 
         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
@@ -319,12 +342,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
                   MessageFactory.digVersions(helloInput.getElements()));
 
-        Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
-
-        ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
-        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+        Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
             @Override
-            public void onSuccess(RpcResult<Void> result) {
+            public void onSuccess(final RpcResult<HelloOutput> result) {
                 if (result.isSuccessful()) {
                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
                               connectionAdapter.getRemoteAddress());
@@ -343,14 +363,14 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(final Throwable throwable) {
                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
                 LOG.trace("DETAIL of sending of hello failure:", throwable);
                 resultFtr.cancel(false);
                 handshakeListener.onHandshakeFailure();
             }
-        });
+        }, MoreExecutors.directExecutor());
         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
         return resultFtr;
     }
@@ -371,16 +391,23 @@ public class HandshakeManagerImpl implements HandshakeManager {
         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
         featuresBuilder.setVersion(version).setXid(xid);
         LOG.debug("sending feature request for version={} and xid={}", version, xid);
-        Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
 
-        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
+        Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
                     @Override
-                    public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+                    public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
                         LOG.trace("features are back");
                         if (rpcFeatures.isSuccessful()) {
                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
 
+                            final Uint64 dpId = featureOutput.getDatapathId();
+                            BigInteger datapathId = dpId == null ? null : dpId.toJava();
+                            connectionAdapter.setDatapathId(datapathId);
+                            if (datapathId == null || !isAllowedToConnect(datapathId)) {
+                                connectionAdapter.disconnect();
+                                return;
+                            }
+
                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
@@ -403,22 +430,47 @@ public class HandshakeManagerImpl implements HandshakeManager {
                     }
 
                     @Override
-                    public void onFailure(Throwable throwable) {
+                    public void onFailure(final Throwable throwable) {
                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
                         LOG.trace("DETAIL of sending of hello failure:", throwable);
                     }
-                });
+                }, MoreExecutors.directExecutor());
         LOG.debug("future features [{}] hooked ..", xid);
     }
 
-    @Override
-    public void setUseVersionBitmap(boolean useVersionBitmap) {
-        this.useVersionBitmap = useVersionBitmap;
+    public boolean isAllowedToConnect(BigInteger nodeId) {
+        // The device isn't allowed for connection till device connection hold time is over
+        if (deviceConnectionHoldTime > 0) {
+            LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
+            if (lastConnectionTime == null) {
+                LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
+                        + "seconds", nodeId, deviceConnectionHoldTime);
+                deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
+                return false;
+            } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
+                LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
+                                + "{}", deviceConnectionHoldTime, nodeId);
+                return false;
+            }
+        }
+
+        if (!deviceConnectionRateLimiter.tryAquire()) {
+            LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
+            connectionAdapter.disconnect();
+            return false;
+        }
+        return true;
     }
 
-    @Override
-    public void setErrorHandler(ErrorHandler errorHandler) {
-        this.errorHandler = errorHandler;
+    /**
+     * Method for unit testing, only.
+     * This method is not thread safe and can only safely be used from a test.
+     */
+    @VisibleForTesting
+    @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
+    void setUseVersionBitmap(final boolean useVersionBitmap) {
+        this.useVersionBitmap = useVersionBitmap;
     }
+
 }