2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.connection;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.math.BigInteger;
18 import java.time.LocalDateTime;
19 import java.util.List;
20 import java.util.Objects;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowplugin.api.OFConstants;
23 import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
27 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
28 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.opendaylight.yangtools.yang.common.Uint64;
38 import org.opendaylight.yangtools.yang.common.Uint8;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public class HandshakeManagerImpl implements HandshakeManager {
44 private static final long ACTIVE_XID = 20L;
46 private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
48 private Short lastProposedVersion;
49 private Short lastReceivedVersion;
50 private final List<Short> versionOrder;
52 private final ConnectionAdapter connectionAdapter;
53 private Short version;
54 private final ErrorHandler errorHandler;
56 private final Short highestVersion;
58 private Long activeXid;
60 private final HandshakeListener handshakeListener;
62 private boolean useVersionBitmap; // not final just for unit test
64 private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
65 private final int deviceConnectionHoldTime;
66 private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
71 * @param connectionAdapter connection adaptor for switch
72 * @param highestVersion highest openflow version
73 * @param versionOrder list of version in order for connection protocol negotiation
74 * @param errorHandler the ErrorHandler
75 * @param handshakeListener the HandshakeListener
76 * @param useVersionBitmap should use negotiation bit map
77 * @param deviceConnectionRateLimiter device connection rate limiter utility
78 * @param deviceConnectionHoldTime deivce connection hold time in seconds
79 * @param deviceConnectionStatusProvider utility for maintaining device connection states
81 public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
82 final List<Short> versionOrder, final ErrorHandler errorHandler,
83 final HandshakeListener handshakeListener, final boolean useVersionBitmap,
84 final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
85 final int deviceConnectionHoldTime,
86 final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
87 this.highestVersion = highestVersion;
88 this.versionOrder = versionOrder;
89 this.connectionAdapter = connectionAdapter;
90 this.errorHandler = errorHandler;
91 this.handshakeListener = handshakeListener;
92 this.useVersionBitmap = useVersionBitmap;
93 this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
94 this.deviceConnectionHoldTime = deviceConnectionHoldTime;
95 this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
99 @SuppressWarnings("checkstyle:IllegalCatch")
100 public synchronized void shake(final HelloMessage receivedHello) {
102 if (version != null) {
103 // Some switches respond with a second HELLO acknowledging our HELLO
104 // but we've already completed the handshake based on the negotiated
105 // version and have registered this switch.
106 LOG.debug("Hello recieved after handshake already settled ... ignoring.");
110 LOG.trace("handshake STARTED");
111 setActiveXid(ACTIVE_XID);
114 if (receivedHello == null) {
115 // first Hello sending
116 sendHelloMessage(highestVersion, getNextXid());
117 lastProposedVersion = highestVersion;
118 LOG.trace("ret - firstHello+wait");
122 // process the 2. and later hellos
123 Uint8 remoteVersion = receivedHello.getVersion();
124 List<Elements> elements = receivedHello.getElements();
125 setActiveXid(receivedHello.getXid().toJava());
126 List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
127 LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
128 remoteVersionBitmap);
130 if (useVersionBitmap && remoteVersionBitmap != null) {
131 // versionBitmap on both sides -> ONE STEP DECISION
132 handleVersionBitmapNegotiation(elements);
134 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
135 handleStepByStepVersionNegotiation(remoteVersion.toJava());
137 } catch (Exception ex) {
138 errorHandler.handleException(ex);
139 LOG.trace("ret - shake fail - closing");
140 handshakeListener.onHandshakeFailure();
145 * Handles the version negotiation step by step.
147 * @param remoteVersion remote version
148 * @throws Exception exception
150 @SuppressWarnings("checkstyle:IllegalCatch")
151 private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
152 LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
155 if (lastProposedVersion == null) {
156 // first hello has not been sent yet, send it and either wait for next remote
157 // version or proceed
158 lastProposedVersion = proposeNextVersion(remoteVersion);
159 final Long nextHelloXid = getNextXid();
160 ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
161 Futures.addCallback(helloResult, new FutureCallback<Void>() {
163 public void onSuccess(final Void result) {
165 stepByStepVersionSubStep(remoteVersion);
166 } catch (Exception e) {
167 errorHandler.handleException(e);
168 handshakeListener.onHandshakeFailure();
173 public void onFailure(final Throwable throwable) {
174 LOG.info("hello sending seriously failed [{}]", nextHelloXid);
175 LOG.trace("detail of hello send problem", throwable);
177 }, MoreExecutors.directExecutor());
179 stepByStepVersionSubStep(remoteVersion);
183 private void stepByStepVersionSubStep(final Short remoteVersion) {
184 if (remoteVersion >= lastProposedVersion) {
185 postHandshake(lastProposedVersion, getNextXid());
186 LOG.trace("ret - OK - switch answered with lastProposedVersion");
188 checkNegotiationStalling(remoteVersion);
190 if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
191 // wait for next version
192 LOG.trace("ret - wait");
194 //propose lower version
195 handleLowerVersionProposal(remoteVersion);
201 * Handles a proposal for a lower version.
203 * @param remoteVersion remote version
204 * @throws Exception exception
206 private void handleLowerVersionProposal(final Short remoteVersion) {
207 Short proposedVersion;
208 // find the version from header version field
209 proposedVersion = proposeNextVersion(remoteVersion);
210 lastProposedVersion = proposedVersion;
211 sendHelloMessage(proposedVersion, getNextXid());
213 if (!Objects.equals(proposedVersion, remoteVersion)) {
214 LOG.trace("ret - sent+wait");
216 LOG.trace("ret - sent+OK");
217 postHandshake(proposedVersion, getNextXid());
222 * Handles the negotiation of the version bitmap.
224 * @param elements version elements
225 * @throws Exception exception
227 private void handleVersionBitmapNegotiation(final List<Elements> elements) {
228 final Short proposedVersion = proposeCommonBitmapVersion(elements);
229 if (lastProposedVersion == null) {
230 // first hello has not been sent yet
231 Long nexHelloXid = getNextXid();
232 ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
233 Futures.addCallback(helloDone, new FutureCallback<Void>() {
235 public void onSuccess(final Void result) {
236 LOG.trace("ret - DONE - versionBitmap");
237 postHandshake(proposedVersion, getNextXid());
241 public void onFailure(final Throwable throwable) {
244 }, MoreExecutors.directExecutor());
245 LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
247 LOG.trace("ret - DONE - versionBitmap");
248 postHandshake(proposedVersion, getNextXid());
252 private Long getNextXid() {
257 private void setActiveXid(final Long xid) {
258 this.activeXid = xid;
262 * Checks negotiation stalling.
264 * @param remoteVersion remove version
266 private void checkNegotiationStalling(final Short remoteVersion) {
267 if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
268 throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
270 lastReceivedVersion = remoteVersion;
274 public Short getVersion() {
279 * find common highest supported bitmap version.
281 * @param list bitmap list
282 * @return proposed bitmap value
284 protected Short proposeCommonBitmapVersion(final List<Elements> list) {
285 Short supportedHighestVersion = null;
286 if (null != list && 0 != list.size()) {
287 for (Elements element : list) {
288 List<Boolean> bitmap = element.getVersionBitmap();
289 // check for version bitmap
290 for (short bitPos : OFConstants.VERSION_ORDER) {
291 // with all the version it should work.
292 if (bitmap.get(bitPos % Integer.SIZE)) {
293 supportedHighestVersion = bitPos;
299 if (null == supportedHighestVersion) {
300 LOG.trace("versionBitmap: no common version found");
301 throw new IllegalArgumentException("no common version found in versionBitmap");
305 return supportedHighestVersion;
309 * find supported version based on remoteVersion.
311 * @param remoteVersion openflow version supported by remote entity
312 * @return openflow version
314 protected short proposeNextVersion(final short remoteVersion) {
315 Short proposal = null;
316 for (short offer : versionOrder) {
317 if (offer <= remoteVersion) {
322 if (proposal == null) {
323 throw new IllegalArgumentException(
324 "no equal or lower version found, unsupported version: " + remoteVersion);
330 * send hello reply without versionBitmap.
332 * @param helloVersion initial hello version for openflow connection negotiation
333 * @param helloXid transaction id
335 private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
338 HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
340 final SettableFuture<Void> resultFtr = SettableFuture.create();
342 LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
343 MessageFactory.digVersions(helloInput.getElements()));
345 Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
347 public void onSuccess(final RpcResult<HelloOutput> result) {
348 if (result.isSuccessful()) {
349 LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
350 connectionAdapter.getRemoteAddress());
353 for (RpcError error : result.getErrors()) {
354 LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
355 error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
356 if (error.getCause() != null) {
357 LOG.trace("DETAIL of sending hello failure", error.getCause());
360 resultFtr.cancel(false);
361 handshakeListener.onHandshakeFailure();
366 public void onFailure(final Throwable throwable) {
367 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
368 connectionAdapter.getRemoteAddress(), throwable.getMessage());
369 LOG.trace("DETAIL of sending of hello failure:", throwable);
370 resultFtr.cancel(false);
371 handshakeListener.onHandshakeFailure();
373 }, MoreExecutors.directExecutor());
374 LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
380 * after handshake set features, register to session.
382 * @param proposedVersion proposed openflow version
383 * @param xid transaction id
385 protected void postHandshake(final Short proposedVersion, final Long xid) {
387 version = proposedVersion;
389 LOG.debug("version set: {}", proposedVersion);
391 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
392 featuresBuilder.setVersion(version).setXid(xid);
393 LOG.debug("sending feature request for version={} and xid={}", version, xid);
395 Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
396 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
398 public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
399 LOG.trace("features are back");
400 if (rpcFeatures.isSuccessful()) {
401 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
403 final Uint64 dpId = featureOutput.getDatapathId();
404 BigInteger datapathId = dpId == null ? null : dpId.toJava();
405 connectionAdapter.setDatapathId(datapathId);
406 if (datapathId == null || !isAllowedToConnect(datapathId)) {
407 connectionAdapter.disconnect();
411 LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
412 LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
413 LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
414 version, featureOutput.getDatapathId(),
415 featureOutput.getAuxiliaryId());
416 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
419 LOG.warn("issuing disconnect during handshake [{}]",
420 connectionAdapter.getRemoteAddress());
421 for (RpcError rpcError : rpcFeatures.getErrors()) {
422 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
423 rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
424 rpcError.getCause());
426 handshakeListener.onHandshakeFailure();
429 LOG.debug("postHandshake DONE");
433 public void onFailure(final Throwable throwable) {
434 LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
435 connectionAdapter.getRemoteAddress(), throwable.getMessage());
436 LOG.trace("DETAIL of sending of hello failure:", throwable);
438 }, MoreExecutors.directExecutor());
439 LOG.debug("future features [{}] hooked ..", xid);
442 public boolean isAllowedToConnect(BigInteger nodeId) {
443 // The device isn't allowed for connection till device connection hold time is over
444 if (deviceConnectionHoldTime > 0) {
445 LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
446 if (lastConnectionTime == null) {
447 LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
448 + "seconds", nodeId, deviceConnectionHoldTime);
449 deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
451 } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
452 LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
453 + "{}", deviceConnectionHoldTime, nodeId);
458 if (!deviceConnectionRateLimiter.tryAquire()) {
459 LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
460 connectionAdapter.disconnect();
467 * Method for unit testing, only.
468 * This method is not thread safe and can only safely be used from a test.
471 @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
472 void setUseVersionBitmap(final boolean useVersionBitmap) {
473 this.useVersionBitmap = useVersionBitmap;