import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
import org.opendaylight.openflowplugin.impl.util.MessageFactory;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
private boolean useVersionBitmap; // not final just for unit test
+ private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
+
/**
* Constructor.
*
* @param useVersionBitmap should use negotiation bit map
*/
public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
- ErrorHandler errorHandler, HandshakeListener handshakeListener, boolean useVersionBitmap) {
+ ErrorHandler errorHandler, HandshakeListener handshakeListener,
+ boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
this.highestVersion = highestVersion;
this.versionOrder = versionOrder;
this.connectionAdapter = connectionAdapter;
this.errorHandler = errorHandler;
this.handshakeListener = handshakeListener;
this.useVersionBitmap = useVersionBitmap;
+ this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
}
@Override
LOG.info("hello sending seriously failed [{}]", nextHelloXid);
LOG.trace("detail of hello send problem", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
} else {
stepByStepVersionSubStep(remoteVersion);
}
}
- private void stepByStepVersionSubStep(Short remoteVersion) throws Exception {
- if (remoteVersion.equals(lastProposedVersion)) {
+ private void stepByStepVersionSubStep(Short remoteVersion) {
+ if (remoteVersion >= lastProposedVersion) {
postHandshake(lastProposedVersion, getNextXid());
LOG.trace("ret - OK - switch answered with lastProposedVersion");
} else {
* @param remoteVersion remote version
* @throws Exception exception
*/
- private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
+ private void handleLowerVersionProposal(Short remoteVersion) {
Short proposedVersion;
// find the version from header version field
proposedVersion = proposeNextVersion(remoteVersion);
* @param elements version elements
* @throws Exception exception
*/
- private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
+ private void handleVersionBitmapNegotiation(List<Elements> elements) {
final Short proposedVersion = proposeCommonBitmapVersion(elements);
if (lastProposedVersion == null) {
// first hello has not been sent yet
public void onFailure(Throwable throwable) {
// NOOP
}
- });
+ }, MoreExecutors.directExecutor());
LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
} else {
LOG.trace("ret - DONE - versionBitmap");
* @param helloVersion initial hello version for openflow connection negotiation
* @param helloXid transaction id
*/
- private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
+ private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
MessageFactory.digVersions(helloInput.getElements()));
- Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
+ Future<RpcResult<HelloOutput>> helloResult = connectionAdapter.hello(helloInput);
- ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
- Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+ ListenableFuture<RpcResult<HelloOutput>> rpcResultListenableFuture
+ = JdkFutureAdapters.listenInPoolThread(helloResult);
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<HelloOutput>>() {
@Override
- public void onSuccess(RpcResult<Void> result) {
+ public void onSuccess(@Nonnull RpcResult<HelloOutput> result) {
if (result.isSuccessful()) {
LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
connectionAdapter.getRemoteAddress());
resultFtr.cancel(false);
handshakeListener.onHandshakeFailure();
}
- });
+ }, MoreExecutors.directExecutor());
LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
return resultFtr;
}
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
new FutureCallback<RpcResult<GetFeaturesOutput>>() {
@Override
- public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+ public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) {
LOG.trace("features are back");
if (rpcFeatures.isSuccessful()) {
GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+ if (!deviceConnectionRateLimiter.tryAquire()) {
+ LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
+ + " the connection from device {}", featureOutput.getDatapathId());
+ connectionAdapter.disconnect();
+ return;
+ }
LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
connectionAdapter.getRemoteAddress(), throwable.getMessage());
LOG.trace("DETAIL of sending of hello failure:", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
LOG.debug("future features [{}] hooked ..", xid);
}