2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.connection;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.List;
18 import java.util.Objects;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.api.OFConstants;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
24 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
25 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.common.Uint64;
35 import org.opendaylight.yangtools.yang.common.Uint8;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 public class HandshakeManagerImpl implements HandshakeManager {
41 private static final long ACTIVE_XID = 20L;
43 private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
45 private Short lastProposedVersion;
46 private Short lastReceivedVersion;
47 private final List<Short> versionOrder;
49 private final ConnectionAdapter connectionAdapter;
50 private Short version;
51 private final ErrorHandler errorHandler;
53 private final Short highestVersion;
55 private Long activeXid;
57 private final HandshakeListener handshakeListener;
59 private boolean useVersionBitmap; // not final just for unit test
61 private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
66 * @param connectionAdapter connection adaptor for switch
67 * @param highestVersion highest openflow version
68 * @param versionOrder list of version in order for connection protocol negotiation
69 * @param errorHandler the ErrorHandler
70 * @param handshakeListener the HandshakeListener
71 * @param useVersionBitmap should use negotiation bit map
73 public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
74 final List<Short> versionOrder,
75 final ErrorHandler errorHandler, final HandshakeListener handshakeListener,
76 final boolean useVersionBitmap,
77 final DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
78 this.highestVersion = highestVersion;
79 this.versionOrder = versionOrder;
80 this.connectionAdapter = connectionAdapter;
81 this.errorHandler = errorHandler;
82 this.handshakeListener = handshakeListener;
83 this.useVersionBitmap = useVersionBitmap;
84 this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
88 @SuppressWarnings("checkstyle:IllegalCatch")
89 public synchronized void shake(final HelloMessage receivedHello) {
91 if (version != null) {
92 // Some switches respond with a second HELLO acknowledging our HELLO
93 // but we've already completed the handshake based on the negotiated
94 // version and have registered this switch.
95 LOG.debug("Hello recieved after handshake already settled ... ignoring.");
99 LOG.trace("handshake STARTED");
100 setActiveXid(ACTIVE_XID);
103 if (receivedHello == null) {
104 // first Hello sending
105 sendHelloMessage(highestVersion, getNextXid());
106 lastProposedVersion = highestVersion;
107 LOG.trace("ret - firstHello+wait");
111 // process the 2. and later hellos
112 Uint8 remoteVersion = receivedHello.getVersion();
113 List<Elements> elements = receivedHello.getElements();
114 setActiveXid(receivedHello.getXid().toJava());
115 List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
116 LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
117 remoteVersionBitmap);
119 if (useVersionBitmap && remoteVersionBitmap != null) {
120 // versionBitmap on both sides -> ONE STEP DECISION
121 handleVersionBitmapNegotiation(elements);
123 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
124 handleStepByStepVersionNegotiation(remoteVersion.toJava());
126 } catch (Exception ex) {
127 errorHandler.handleException(ex);
128 LOG.trace("ret - shake fail - closing");
129 handshakeListener.onHandshakeFailure();
134 * Handles the version negotiation step by step.
136 * @param remoteVersion remote version
137 * @throws Exception exception
139 @SuppressWarnings("checkstyle:IllegalCatch")
140 private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
141 LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
144 if (lastProposedVersion == null) {
145 // first hello has not been sent yet, send it and either wait for next remote
146 // version or proceed
147 lastProposedVersion = proposeNextVersion(remoteVersion);
148 final Long nextHelloXid = getNextXid();
149 ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
150 Futures.addCallback(helloResult, new FutureCallback<Void>() {
152 public void onSuccess(final Void result) {
154 stepByStepVersionSubStep(remoteVersion);
155 } catch (Exception e) {
156 errorHandler.handleException(e);
157 handshakeListener.onHandshakeFailure();
162 public void onFailure(final Throwable throwable) {
163 LOG.info("hello sending seriously failed [{}]", nextHelloXid);
164 LOG.trace("detail of hello send problem", throwable);
166 }, MoreExecutors.directExecutor());
168 stepByStepVersionSubStep(remoteVersion);
172 private void stepByStepVersionSubStep(final Short remoteVersion) {
173 if (remoteVersion >= lastProposedVersion) {
174 postHandshake(lastProposedVersion, getNextXid());
175 LOG.trace("ret - OK - switch answered with lastProposedVersion");
177 checkNegotiationStalling(remoteVersion);
179 if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
180 // wait for next version
181 LOG.trace("ret - wait");
183 //propose lower version
184 handleLowerVersionProposal(remoteVersion);
190 * Handles a proposal for a lower version.
192 * @param remoteVersion remote version
193 * @throws Exception exception
195 private void handleLowerVersionProposal(final Short remoteVersion) {
196 Short proposedVersion;
197 // find the version from header version field
198 proposedVersion = proposeNextVersion(remoteVersion);
199 lastProposedVersion = proposedVersion;
200 sendHelloMessage(proposedVersion, getNextXid());
202 if (!Objects.equals(proposedVersion, remoteVersion)) {
203 LOG.trace("ret - sent+wait");
205 LOG.trace("ret - sent+OK");
206 postHandshake(proposedVersion, getNextXid());
211 * Handles the negotiation of the version bitmap.
213 * @param elements version elements
214 * @throws Exception exception
216 private void handleVersionBitmapNegotiation(final List<Elements> elements) {
217 final Short proposedVersion = proposeCommonBitmapVersion(elements);
218 if (lastProposedVersion == null) {
219 // first hello has not been sent yet
220 Long nexHelloXid = getNextXid();
221 ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
222 Futures.addCallback(helloDone, new FutureCallback<Void>() {
224 public void onSuccess(final Void result) {
225 LOG.trace("ret - DONE - versionBitmap");
226 postHandshake(proposedVersion, getNextXid());
230 public void onFailure(final Throwable throwable) {
233 }, MoreExecutors.directExecutor());
234 LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
236 LOG.trace("ret - DONE - versionBitmap");
237 postHandshake(proposedVersion, getNextXid());
241 private Long getNextXid() {
246 private void setActiveXid(final Long xid) {
247 this.activeXid = xid;
251 * Checks negotiation stalling.
253 * @param remoteVersion remove version
255 private void checkNegotiationStalling(final Short remoteVersion) {
256 if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
257 throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
259 lastReceivedVersion = remoteVersion;
263 public Short getVersion() {
268 * find common highest supported bitmap version.
270 * @param list bitmap list
271 * @return proposed bitmap value
273 protected Short proposeCommonBitmapVersion(final List<Elements> list) {
274 Short supportedHighestVersion = null;
275 if (null != list && 0 != list.size()) {
276 for (Elements element : list) {
277 List<Boolean> bitmap = element.getVersionBitmap();
278 // check for version bitmap
279 for (short bitPos : OFConstants.VERSION_ORDER) {
280 // with all the version it should work.
281 if (bitmap.get(bitPos % Integer.SIZE)) {
282 supportedHighestVersion = bitPos;
288 if (null == supportedHighestVersion) {
289 LOG.trace("versionBitmap: no common version found");
290 throw new IllegalArgumentException("no common version found in versionBitmap");
294 return supportedHighestVersion;
298 * find supported version based on remoteVersion.
300 * @param remoteVersion openflow version supported by remote entity
301 * @return openflow version
303 protected short proposeNextVersion(final short remoteVersion) {
304 Short proposal = null;
305 for (short offer : versionOrder) {
306 if (offer <= remoteVersion) {
311 if (proposal == null) {
312 throw new IllegalArgumentException(
313 "no equal or lower version found, unsupported version: " + remoteVersion);
319 * send hello reply without versionBitmap.
321 * @param helloVersion initial hello version for openflow connection negotiation
322 * @param helloXid transaction id
324 private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
327 HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
329 final SettableFuture<Void> resultFtr = SettableFuture.create();
331 LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
332 MessageFactory.digVersions(helloInput.getElements()));
334 Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
336 public void onSuccess(final RpcResult<HelloOutput> result) {
337 if (result.isSuccessful()) {
338 LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
339 connectionAdapter.getRemoteAddress());
342 for (RpcError error : result.getErrors()) {
343 LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
344 error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
345 if (error.getCause() != null) {
346 LOG.trace("DETAIL of sending hello failure", error.getCause());
349 resultFtr.cancel(false);
350 handshakeListener.onHandshakeFailure();
355 public void onFailure(final Throwable throwable) {
356 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
357 connectionAdapter.getRemoteAddress(), throwable.getMessage());
358 LOG.trace("DETAIL of sending of hello failure:", throwable);
359 resultFtr.cancel(false);
360 handshakeListener.onHandshakeFailure();
362 }, MoreExecutors.directExecutor());
363 LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
369 * after handshake set features, register to session.
371 * @param proposedVersion proposed openflow version
372 * @param xid transaction id
374 protected void postHandshake(final Short proposedVersion, final Long xid) {
376 version = proposedVersion;
378 LOG.debug("version set: {}", proposedVersion);
380 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
381 featuresBuilder.setVersion(version).setXid(xid);
382 LOG.debug("sending feature request for version={} and xid={}", version, xid);
384 Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
385 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
387 public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
388 LOG.trace("features are back");
389 if (rpcFeatures.isSuccessful()) {
390 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
392 final Uint64 dpId = featureOutput.getDatapathId();
393 connectionAdapter.setDatapathId(dpId == null ? null : dpId.toJava());
394 if (!deviceConnectionRateLimiter.tryAquire()) {
395 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
396 + " the connection from device {}", featureOutput.getDatapathId());
397 connectionAdapter.disconnect();
401 LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
402 LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
403 LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
404 version, featureOutput.getDatapathId(),
405 featureOutput.getAuxiliaryId());
406 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
409 LOG.warn("issuing disconnect during handshake [{}]",
410 connectionAdapter.getRemoteAddress());
411 for (RpcError rpcError : rpcFeatures.getErrors()) {
412 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
413 rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
414 rpcError.getCause());
416 handshakeListener.onHandshakeFailure();
419 LOG.debug("postHandshake DONE");
423 public void onFailure(final Throwable throwable) {
424 LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
425 connectionAdapter.getRemoteAddress(), throwable.getMessage());
426 LOG.trace("DETAIL of sending of hello failure:", throwable);
428 }, MoreExecutors.directExecutor());
429 LOG.debug("future features [{}] hooked ..", xid);
433 * Method for unit testing, only.
434 * This method is not thread safe and can only safely be used from a test.
437 @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
438 void setUseVersionBitmap(final boolean useVersionBitmap) {
439 this.useVersionBitmap = useVersionBitmap;