Remove Objects.{is,non}Null abuse
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
index f1240939eb9c3f9e18a9f5a23415b4f96e1706c5..2e8a92e65f6b5ce12302a09095b15100cde4ba21 100644 (file)
@@ -12,21 +12,25 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -55,6 +59,8 @@ public class HandshakeManagerImpl implements HandshakeManager {
 
     private boolean useVersionBitmap; // not final just for unit test
 
+    private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+
     /**
      * Constructor.
      *
@@ -66,13 +72,15 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param useVersionBitmap  should use negotiation bit map
      */
     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
-            ErrorHandler errorHandler, HandshakeListener handshakeListener, boolean useVersionBitmap) {
+                                ErrorHandler errorHandler, HandshakeListener handshakeListener,
+                                boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
         this.highestVersion = highestVersion;
         this.versionOrder = versionOrder;
         this.connectionAdapter = connectionAdapter;
         this.errorHandler = errorHandler;
         this.handshakeListener = handshakeListener;
         this.useVersionBitmap = useVersionBitmap;
+        this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
     }
 
     @Override
@@ -154,14 +162,14 @@ public class HandshakeManagerImpl implements HandshakeManager {
                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
                     LOG.trace("detail of hello send problem", throwable);
                 }
-            });
+            }, MoreExecutors.directExecutor());
         } else {
             stepByStepVersionSubStep(remoteVersion);
         }
     }
 
-    private void stepByStepVersionSubStep(Short remoteVersion) throws Exception {
-        if (remoteVersion.equals(lastProposedVersion)) {
+    private void stepByStepVersionSubStep(Short remoteVersion) {
+        if (remoteVersion >= lastProposedVersion) {
             postHandshake(lastProposedVersion, getNextXid());
             LOG.trace("ret - OK - switch answered with lastProposedVersion");
         } else {
@@ -183,7 +191,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param remoteVersion remote version
      * @throws Exception exception
      */
-    private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
+    private void handleLowerVersionProposal(Short remoteVersion) {
         Short proposedVersion;
         // find the version from header version field
         proposedVersion = proposeNextVersion(remoteVersion);
@@ -204,7 +212,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param elements version elements
      * @throws Exception exception
      */
-    private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
+    private void handleVersionBitmapNegotiation(List<Elements> elements) {
         final Short proposedVersion = proposeCommonBitmapVersion(elements);
         if (lastProposedVersion == null) {
             // first hello has not been sent yet
@@ -221,7 +229,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 public void onFailure(Throwable throwable) {
                     // NOOP
                 }
-            });
+            }, MoreExecutors.directExecutor());
             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
         } else {
             LOG.trace("ret - DONE - versionBitmap");
@@ -312,7 +320,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
      * @param helloVersion initial hello version for openflow connection negotiation
      * @param helloXid     transaction id
      */
-    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
+    private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
 
 
         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
@@ -322,12 +330,13 @@ public class HandshakeManagerImpl implements HandshakeManager {
         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
                   MessageFactory.digVersions(helloInput.getElements()));
 
-        Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
+        Future<RpcResult<HelloOutput>> helloResult = connectionAdapter.hello(helloInput);
 
-        ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
-        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+        ListenableFuture<RpcResult<HelloOutput>> rpcResultListenableFuture
+                = JdkFutureAdapters.listenInPoolThread(helloResult);
+        Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<HelloOutput>>() {
             @Override
-            public void onSuccess(RpcResult<Void> result) {
+            public void onSuccess(@Nonnull RpcResult<HelloOutput> result) {
                 if (result.isSuccessful()) {
                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
                               connectionAdapter.getRemoteAddress());
@@ -353,7 +362,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                 resultFtr.cancel(false);
                 handshakeListener.onHandshakeFailure();
             }
-        });
+        }, MoreExecutors.directExecutor());
         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
         return resultFtr;
     }
@@ -379,10 +388,16 @@ public class HandshakeManagerImpl implements HandshakeManager {
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
                     @Override
-                    public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+                    public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) {
                         LOG.trace("features are back");
                         if (rpcFeatures.isSuccessful()) {
                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+                            if (!deviceConnectionRateLimiter.tryAquire()) {
+                                LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
+                                        + " the connection from device {}", featureOutput.getDatapathId());
+                                connectionAdapter.disconnect();
+                                return;
+                            }
 
                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
@@ -411,7 +426,7 @@ public class HandshakeManagerImpl implements HandshakeManager {
                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
                         LOG.trace("DETAIL of sending of hello failure:", throwable);
                     }
-                });
+                }, MoreExecutors.directExecutor());
         LOG.debug("future features [{}] hooked ..", xid);
     }