-/**
+/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.math.BigInteger;
+import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint64;
+import org.opendaylight.yangtools.yang.common.Uint8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private boolean useVersionBitmap; // not final just for unit test
private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+ private final int deviceConnectionHoldTime;
+ private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
/**
* Constructor.
* @param errorHandler the ErrorHandler
* @param handshakeListener the HandshakeListener
* @param useVersionBitmap should use negotiation bit map
+ * @param deviceConnectionRateLimiter device connection rate limiter utility
+ * @param deviceConnectionHoldTime deivce connection hold time in seconds
+ * @param deviceConnectionStatusProvider utility for maintaining device connection states
*/
- public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
- ErrorHandler errorHandler, HandshakeListener handshakeListener,
- boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
+ public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
+ final List<Short> versionOrder, final ErrorHandler errorHandler,
+ final HandshakeListener handshakeListener, final boolean useVersionBitmap,
+ final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
+ final int deviceConnectionHoldTime,
+ final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
this.highestVersion = highestVersion;
this.versionOrder = versionOrder;
this.connectionAdapter = connectionAdapter;
this.handshakeListener = handshakeListener;
this.useVersionBitmap = useVersionBitmap;
this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
+ this.deviceConnectionHoldTime = deviceConnectionHoldTime;
+ this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public synchronized void shake(HelloMessage receivedHello) {
+ public synchronized void shake(final HelloMessage receivedHello) {
if (version != null) {
// Some switches respond with a second HELLO acknowledging our HELLO
}
// process the 2. and later hellos
- Short remoteVersion = receivedHello.getVersion();
+ Uint8 remoteVersion = receivedHello.getVersion();
List<Elements> elements = receivedHello.getElements();
- setActiveXid(receivedHello.getXid());
+ setActiveXid(receivedHello.getXid().toJava());
List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
remoteVersionBitmap);
handleVersionBitmapNegotiation(elements);
} else {
// versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
- handleStepByStepVersionNegotiation(remoteVersion);
+ handleStepByStepVersionNegotiation(remoteVersion.toJava());
}
} catch (Exception ex) {
errorHandler.handleException(ex);
ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
Futures.addCallback(helloResult, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
try {
stepByStepVersionSubStep(remoteVersion);
} catch (Exception e) {
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.info("hello sending seriously failed [{}]", nextHelloXid);
LOG.trace("detail of hello send problem", throwable);
}
}
}
- private void stepByStepVersionSubStep(Short remoteVersion) {
+ private void stepByStepVersionSubStep(final Short remoteVersion) {
if (remoteVersion >= lastProposedVersion) {
postHandshake(lastProposedVersion, getNextXid());
LOG.trace("ret - OK - switch answered with lastProposedVersion");
* @param remoteVersion remote version
* @throws Exception exception
*/
- private void handleLowerVersionProposal(Short remoteVersion) {
+ private void handleLowerVersionProposal(final Short remoteVersion) {
Short proposedVersion;
// find the version from header version field
proposedVersion = proposeNextVersion(remoteVersion);
* @param elements version elements
* @throws Exception exception
*/
- private void handleVersionBitmapNegotiation(List<Elements> elements) {
+ private void handleVersionBitmapNegotiation(final List<Elements> elements) {
final Short proposedVersion = proposeCommonBitmapVersion(elements);
if (lastProposedVersion == null) {
// first hello has not been sent yet
ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
Futures.addCallback(helloDone, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final Void result) {
LOG.trace("ret - DONE - versionBitmap");
postHandshake(proposedVersion, getNextXid());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
// NOOP
}
}, MoreExecutors.directExecutor());
return activeXid;
}
- private void setActiveXid(Long xid) {
+ private void setActiveXid(final Long xid) {
this.activeXid = xid;
}
*
* @param remoteVersion remove version
*/
- private void checkNegotiationStalling(Short remoteVersion) {
+ private void checkNegotiationStalling(final Short remoteVersion) {
if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
}
* @param list bitmap list
* @return proposed bitmap value
*/
- protected Short proposeCommonBitmapVersion(List<Elements> list) {
+ protected Short proposeCommonBitmapVersion(final List<Elements> list) {
Short supportedHighestVersion = null;
if (null != list && 0 != list.size()) {
for (Elements element : list) {
* @param remoteVersion openflow version supported by remote entity
* @return openflow version
*/
- protected short proposeNextVersion(short remoteVersion) {
+ protected short proposeNextVersion(final short remoteVersion) {
Short proposal = null;
for (short offer : versionOrder) {
if (offer <= remoteVersion) {
* @param helloVersion initial hello version for openflow connection negotiation
* @param helloXid transaction id
*/
- private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
+ private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
@Override
- public void onSuccess(RpcResult<HelloOutput> result) {
+ public void onSuccess(final RpcResult<HelloOutput> result) {
if (result.isSuccessful()) {
LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
connectionAdapter.getRemoteAddress());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
connectionAdapter.getRemoteAddress(), throwable.getMessage());
LOG.trace("DETAIL of sending of hello failure:", throwable);
Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
new FutureCallback<RpcResult<GetFeaturesOutput>>() {
@Override
- public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+ public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
LOG.trace("features are back");
if (rpcFeatures.isSuccessful()) {
GetFeaturesOutput featureOutput = rpcFeatures.getResult();
- if (!deviceConnectionRateLimiter.tryAquire()) {
- LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
- + " the connection from device {}", featureOutput.getDatapathId());
+
+ final Uint64 dpId = featureOutput.getDatapathId();
+ BigInteger datapathId = dpId == null ? null : dpId.toJava();
+ connectionAdapter.setDatapathId(datapathId);
+ if (datapathId == null || !isAllowedToConnect(datapathId)) {
connectionAdapter.disconnect();
return;
}
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
connectionAdapter.getRemoteAddress(), throwable.getMessage());
LOG.trace("DETAIL of sending of hello failure:", throwable);
LOG.debug("future features [{}] hooked ..", xid);
}
+ public boolean isAllowedToConnect(BigInteger nodeId) {
+ // The device isn't allowed for connection till device connection hold time is over
+ if (deviceConnectionHoldTime > 0) {
+ LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
+ if (lastConnectionTime == null) {
+ LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
+ + "seconds", nodeId, deviceConnectionHoldTime);
+ deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
+ return false;
+ } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
+ LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
+ + "{}", deviceConnectionHoldTime, nodeId);
+ return false;
+ }
+ }
+
+ if (!deviceConnectionRateLimiter.tryAquire()) {
+ LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
+ connectionAdapter.disconnect();
+ return false;
+ }
+ return true;
+ }
+
/**
* Method for unit testing, only.
* This method is not thread safe and can only safely be used from a test.
*/
@VisibleForTesting
@SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
- void setUseVersionBitmap(boolean useVersionBitmap) {
+ void setUseVersionBitmap(final boolean useVersionBitmap) {
this.useVersionBitmap = useVersionBitmap;
}