Cleanup FlowMessageSerializer
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
index c5c60efb75fcd8c3ed0f7b8838bda1bfc3a29e9c..36fe52daf8aaf5da8ebeed579494e7d31fce3569 100644 (file)
@@ -14,10 +14,13 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Objects;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
@@ -31,6 +34,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint64;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +62,8 @@ public class HandshakeManagerImpl implements HandshakeManager {
     private boolean useVersionBitmap; // not final just for unit test
 
     private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+    private final int deviceConnectionHoldTime;
+    private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
 
     /**
      * Constructor.
@@ -67,10 +74,16 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param errorHandler      the ErrorHandler
      * @param handshakeListener the HandshakeListener
      * @param useVersionBitmap  should use negotiation bit map
+     * @param deviceConnectionRateLimiter  device connection rate limiter utility
+     * @param deviceConnectionHoldTime  deivce connection hold time in seconds
+     * @param deviceConnectionStatusProvider  utility for maintaining device connection states
      */
-    public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
-                                ErrorHandler errorHandler, HandshakeListener handshakeListener,
-                                boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
+    public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
+                                final List<Short> versionOrder, final ErrorHandler errorHandler,
+                                final HandshakeListener handshakeListener, final boolean useVersionBitmap,
+                                final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
+                                final int deviceConnectionHoldTime,
+                                final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
         this.highestVersion = highestVersion;
         this.versionOrder = versionOrder;
         this.connectionAdapter = connectionAdapter;
@@ -78,11 +91,13 @@ public class HandshakeManagerImpl implements HandshakeManager {
         this.handshakeListener = handshakeListener;
         this.useVersionBitmap = useVersionBitmap;
         this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
+        this.deviceConnectionHoldTime = deviceConnectionHoldTime;
+        this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public synchronized void shake(HelloMessage receivedHello) {
+    public synchronized void shake(final HelloMessage receivedHello) {
 
         if (version != null) {
             // Some switches respond with a second HELLO acknowledging our HELLO
@@ -105,9 +120,9 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             // process the 2. and later hellos
-            Short remoteVersion = receivedHello.getVersion();
+            Uint8 remoteVersion = receivedHello.getVersion();
             List<Elements> elements = receivedHello.getElements();
-            setActiveXid(receivedHello.getXid());
+            setActiveXid(receivedHello.getXid().toJava());
             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
                       remoteVersionBitmap);
@@ -117,7 +132,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 handleVersionBitmapNegotiation(elements);
             } else {
                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
-                handleStepByStepVersionNegotiation(remoteVersion);
+                handleStepByStepVersionNegotiation(remoteVersion.toJava());
             }
         } catch (Exception ex) {
             errorHandler.handleException(ex);
@@ -145,7 +160,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
             Futures.addCallback(helloResult, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
+                public void onSuccess(final Void result) {
                     try {
                         stepByStepVersionSubStep(remoteVersion);
                     } catch (Exception e) {
@@ -155,7 +170,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 }
 
                 @Override
-                public void onFailure(Throwable throwable) {
+                public void onFailure(final Throwable throwable) {
                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
                     LOG.trace("detail of hello send problem", throwable);
                 }
@@ -165,7 +180,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
         }
     }
 
-    private void stepByStepVersionSubStep(Short remoteVersion) {
+    private void stepByStepVersionSubStep(final Short remoteVersion) {
         if (remoteVersion >= lastProposedVersion) {
             postHandshake(lastProposedVersion, getNextXid());
             LOG.trace("ret - OK - switch answered with lastProposedVersion");
@@ -188,7 +203,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param remoteVersion remote version
      * @throws Exception exception
      */
-    private void handleLowerVersionProposal(Short remoteVersion) {
+    private void handleLowerVersionProposal(final Short remoteVersion) {
         Short proposedVersion;
         // find the version from header version field
         proposedVersion = proposeNextVersion(remoteVersion);
@@ -209,7 +224,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param elements version elements
      * @throws Exception exception
      */
-    private void handleVersionBitmapNegotiation(List<Elements> elements) {
+    private void handleVersionBitmapNegotiation(final List<Elements> elements) {
         final Short proposedVersion = proposeCommonBitmapVersion(elements);
         if (lastProposedVersion == null) {
             // first hello has not been sent yet
@@ -217,13 +232,13 @@ public class HandshakeManagerImpl implements HandshakeManager {
             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
             Futures.addCallback(helloDone, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
+                public void onSuccess(final Void result) {
                     LOG.trace("ret - DONE - versionBitmap");
                     postHandshake(proposedVersion, getNextXid());
                 }
 
                 @Override
-                public void onFailure(Throwable throwable) {
+                public void onFailure(final Throwable throwable) {
                     // NOOP
                 }
             }, MoreExecutors.directExecutor());
@@ -239,7 +254,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
         return activeXid;
     }
 
-    private void setActiveXid(Long xid) {
+    private void setActiveXid(final Long xid) {
         this.activeXid = xid;
     }
 
@@ -248,7 +263,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      *
      * @param remoteVersion remove version
      */
-    private void checkNegotiationStalling(Short remoteVersion) {
+    private void checkNegotiationStalling(final Short remoteVersion) {
         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
         }
@@ -266,7 +281,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param list bitmap list
      * @return proposed bitmap value
      */
-    protected Short proposeCommonBitmapVersion(List<Elements> list) {
+    protected Short proposeCommonBitmapVersion(final List<Elements> list) {
         Short supportedHighestVersion = null;
         if (null != list && 0 != list.size()) {
             for (Elements element : list) {
@@ -296,7 +311,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param remoteVersion openflow version supported by remote entity
      * @return openflow version
      */
-    protected short proposeNextVersion(short remoteVersion) {
+    protected short proposeNextVersion(final short remoteVersion) {
         Short proposal = null;
         for (short offer : versionOrder) {
             if (offer <= remoteVersion) {
@@ -317,7 +332,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param helloVersion initial hello version for openflow connection negotiation
      * @param helloXid     transaction id
      */
-    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
+    private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
 
 
         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
@@ -329,7 +344,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
         Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
             @Override
-            public void onSuccess(RpcResult<HelloOutput> result) {
+            public void onSuccess(final RpcResult<HelloOutput> result) {
                 if (result.isSuccessful()) {
                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
                               connectionAdapter.getRemoteAddress());
@@ -348,7 +363,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
             }
 
             @Override
-            public void onFailure(Throwable throwable) {
+            public void onFailure(final Throwable throwable) {
                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
                 LOG.trace("DETAIL of sending of hello failure:", throwable);
@@ -380,14 +395,15 @@ public class HandshakeManagerImpl implements HandshakeManager {
         Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
                     @Override
-                    public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+                    public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
                         LOG.trace("features are back");
                         if (rpcFeatures.isSuccessful()) {
                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
-                            connectionAdapter.setDatapathId(featureOutput.getDatapathId());
-                            if (!deviceConnectionRateLimiter.tryAquire()) {
-                                LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
-                                        + " the connection from device {}", featureOutput.getDatapathId());
+
+                            final Uint64 dpId = featureOutput.getDatapathId();
+                            BigInteger datapathId = dpId == null ? null : dpId.toJava();
+                            connectionAdapter.setDatapathId(datapathId);
+                            if (datapathId == null || !isAllowedToConnect(datapathId)) {
                                 connectionAdapter.disconnect();
                                 return;
                             }
@@ -414,7 +430,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                     }
 
                     @Override
-                    public void onFailure(Throwable throwable) {
+                    public void onFailure(final Throwable throwable) {
                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
                         LOG.trace("DETAIL of sending of hello failure:", throwable);
@@ -423,13 +439,37 @@ public class HandshakeManagerImpl implements HandshakeManager {
         LOG.debug("future features [{}] hooked ..", xid);
     }
 
+    public boolean isAllowedToConnect(BigInteger nodeId) {
+        // The device isn't allowed for connection till device connection hold time is over
+        if (deviceConnectionHoldTime > 0) {
+            LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
+            if (lastConnectionTime == null) {
+                LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
+                        + "seconds", nodeId, deviceConnectionHoldTime);
+                deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
+                return false;
+            } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
+                LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
+                                + "{}", deviceConnectionHoldTime, nodeId);
+                return false;
+            }
+        }
+
+        if (!deviceConnectionRateLimiter.tryAquire()) {
+            LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
+            connectionAdapter.disconnect();
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Method for unit testing, only.
      * This method is not thread safe and can only safely be used from a test.
      */
     @VisibleForTesting
     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
-    void setUseVersionBitmap(boolean useVersionBitmap) {
+    void setUseVersionBitmap(final boolean useVersionBitmap) {
         this.useVersionBitmap = useVersionBitmap;
     }