-/**
+/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
*/
package org.opendaylight.openflowplugin.impl.connection;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.Future;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
import org.opendaylight.openflowplugin.impl.util.MessageFactory;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint64;
+import org.opendaylight.yangtools.yang.common.Uint8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Short lastReceivedVersion;
private final List<Short> versionOrder;
-
private final ConnectionAdapter connectionAdapter;
private Short version;
- private ErrorHandler errorHandler;
+ private final ErrorHandler errorHandler;
- private Short highestVersion;
+ private final Short highestVersion;
private Long activeXid;
- private HandshakeListener handshakeListener;
+ private final HandshakeListener handshakeListener;
+
+ private boolean useVersionBitmap; // not final just for unit test
- private boolean useVersionBitmap;
+ private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+ private final int deviceConnectionHoldTime;
+ private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
/**
* Constructor.
* @param connectionAdapter connection adaptor for switch
* @param highestVersion highest openflow version
* @param versionOrder list of version in order for connection protocol negotiation
+ * @param errorHandler the ErrorHandler
+ * @param handshakeListener the HandshakeListener
+ * @param useVersionBitmap should use negotiation bit map
+ * @param deviceConnectionRateLimiter device connection rate limiter utility
+ * @param deviceConnectionHoldTime deivce connection hold time in seconds
+ * @param deviceConnectionStatusProvider utility for maintaining device connection states
*/
- public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder) {
+ public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
+ final List<Short> versionOrder, final ErrorHandler errorHandler,
+ final HandshakeListener handshakeListener, final boolean useVersionBitmap,
+ final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
+ final int deviceConnectionHoldTime,
+ final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
this.highestVersion = highestVersion;
this.versionOrder = versionOrder;
this.connectionAdapter = connectionAdapter;
- }
-
- @Override
- public void setHandshakeListener(HandshakeListener handshakeListener) {
+ this.errorHandler = errorHandler;
this.handshakeListener = handshakeListener;
+ this.useVersionBitmap = useVersionBitmap;
+ this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
+ this.deviceConnectionHoldTime = deviceConnectionHoldTime;
+ this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public synchronized void shake(HelloMessage receivedHello) {
+ public synchronized void shake(final HelloMessage receivedHello) {
if (version != null) {
// Some switches respond with a second HELLO acknowledging our HELLO
}
// process the 2. and later hellos
- Short remoteVersion = receivedHello.getVersion();
+ Uint8 remoteVersion = receivedHello.getVersion();
List<Elements> elements = receivedHello.getElements();
- setActiveXid(receivedHello.getXid());
+ setActiveXid(receivedHello.getXid().toJava());
List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
remoteVersionBitmap);
handleVersionBitmapNegotiation(elements);
} else {
// versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
- handleStepByStepVersionNegotiation(remoteVersion);
+ handleStepByStepVersionNegotiation(remoteVersion.toJava());
}
} catch (Exception ex) {
errorHandler.handleException(ex);
ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
Futures.addCallback(helloResult, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
try {
- stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+ stepByStepVersionSubStep(remoteVersion);
} catch (Exception e) {
errorHandler.handleException(e);
handshakeListener.onHandshakeFailure();
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.info("hello sending seriously failed [{}]", nextHelloXid);
LOG.trace("detail of hello send problem", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
} else {
- stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+ stepByStepVersionSubStep(remoteVersion);
}
}
- private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
- if (remoteVersion.equals(lastProposedVersion)) {
+ private void stepByStepVersionSubStep(final Short remoteVersion) {
+ if (remoteVersion >= lastProposedVersion) {
postHandshake(lastProposedVersion, getNextXid());
LOG.trace("ret - OK - switch answered with lastProposedVersion");
} else {
checkNegotiationStalling(remoteVersion);
- if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
+ if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
// wait for next version
LOG.trace("ret - wait");
} else {
* @param remoteVersion remote version
* @throws Exception exception
*/
- private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
+ private void handleLowerVersionProposal(final Short remoteVersion) {
Short proposedVersion;
// find the version from header version field
proposedVersion = proposeNextVersion(remoteVersion);
* @param elements version elements
* @throws Exception exception
*/
- private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
+ private void handleVersionBitmapNegotiation(final List<Elements> elements) {
final Short proposedVersion = proposeCommonBitmapVersion(elements);
if (lastProposedVersion == null) {
// first hello has not been sent yet
ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
Futures.addCallback(helloDone, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
LOG.trace("ret - DONE - versionBitmap");
postHandshake(proposedVersion, getNextXid());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
// NOOP
}
- });
+ }, MoreExecutors.directExecutor());
LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
} else {
LOG.trace("ret - DONE - versionBitmap");
return activeXid;
}
- private void setActiveXid(Long xid) {
+ private void setActiveXid(final Long xid) {
this.activeXid = xid;
}
*
* @param remoteVersion remove version
*/
- private void checkNegotiationStalling(Short remoteVersion) {
+ private void checkNegotiationStalling(final Short remoteVersion) {
if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
}
* @param list bitmap list
* @return proposed bitmap value
*/
- protected Short proposeCommonBitmapVersion(List<Elements> list) {
+ protected Short proposeCommonBitmapVersion(final List<Elements> list) {
Short supportedHighestVersion = null;
- if ((null != list) && (0 != list.size())) {
+ if (null != list && 0 != list.size()) {
for (Elements element : list) {
List<Boolean> bitmap = element.getVersionBitmap();
// check for version bitmap
* @param remoteVersion openflow version supported by remote entity
* @return openflow version
*/
- protected short proposeNextVersion(short remoteVersion) {
+ protected short proposeNextVersion(final short remoteVersion) {
Short proposal = null;
for (short offer : versionOrder) {
if (offer <= remoteVersion) {
* @param helloVersion initial hello version for openflow connection negotiation
* @param helloXid transaction id
*/
- private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
+ private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
MessageFactory.digVersions(helloInput.getElements()));
- Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
-
- ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
- Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+ Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
@Override
- public void onSuccess(RpcResult<Void> result) {
+ public void onSuccess(final RpcResult<HelloOutput> result) {
if (result.isSuccessful()) {
LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
connectionAdapter.getRemoteAddress());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
connectionAdapter.getRemoteAddress(), throwable.getMessage());
LOG.trace("DETAIL of sending of hello failure:", throwable);
resultFtr.cancel(false);
handshakeListener.onHandshakeFailure();
}
- });
+ }, MoreExecutors.directExecutor());
LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
return resultFtr;
}
GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
featuresBuilder.setVersion(version).setXid(xid);
LOG.debug("sending feature request for version={} and xid={}", version, xid);
- Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
- Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
+ Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
new FutureCallback<RpcResult<GetFeaturesOutput>>() {
@Override
- public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+ public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
LOG.trace("features are back");
if (rpcFeatures.isSuccessful()) {
GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+ final Uint64 dpId = featureOutput.getDatapathId();
+ BigInteger datapathId = dpId == null ? null : dpId.toJava();
+ connectionAdapter.setDatapathId(datapathId);
+ if (datapathId == null || !isAllowedToConnect(datapathId)) {
+ connectionAdapter.disconnect();
+ return;
+ }
+
LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
connectionAdapter.getRemoteAddress(), throwable.getMessage());
LOG.trace("DETAIL of sending of hello failure:", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
LOG.debug("future features [{}] hooked ..", xid);
}
- @Override
- public void setUseVersionBitmap(boolean useVersionBitmap) {
- this.useVersionBitmap = useVersionBitmap;
+ public boolean isAllowedToConnect(BigInteger nodeId) {
+ // The device isn't allowed for connection till device connection hold time is over
+ if (deviceConnectionHoldTime > 0) {
+ LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
+ if (lastConnectionTime == null) {
+ LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
+ + "seconds", nodeId, deviceConnectionHoldTime);
+ deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
+ return false;
+ } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
+ LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
+ + "{}", deviceConnectionHoldTime, nodeId);
+ return false;
+ }
+ }
+
+ if (!deviceConnectionRateLimiter.tryAquire()) {
+ LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
+ connectionAdapter.disconnect();
+ return false;
+ }
+ return true;
}
- @Override
- public void setErrorHandler(ErrorHandler errorHandler) {
- this.errorHandler = errorHandler;
+ /**
+ * Method for unit testing, only.
+ * This method is not thread safe and can only safely be used from a test.
+ */
+ @VisibleForTesting
+ @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
+ void setUseVersionBitmap(final boolean useVersionBitmap) {
+ this.useVersionBitmap = useVersionBitmap;
}
+
}