2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.connection;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.List;
18 import java.util.Objects;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.api.OFConstants;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
24 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
25 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class HandshakeManagerImpl implements HandshakeManager {
39 private static final long ACTIVE_XID = 20L;
41 private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
43 private Short lastProposedVersion;
44 private Short lastReceivedVersion;
45 private final List<Short> versionOrder;
47 private final ConnectionAdapter connectionAdapter;
48 private Short version;
49 private final ErrorHandler errorHandler;
51 private final Short highestVersion;
53 private Long activeXid;
55 private final HandshakeListener handshakeListener;
57 private boolean useVersionBitmap; // not final just for unit test
59 private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
64 * @param connectionAdapter connection adaptor for switch
65 * @param highestVersion highest openflow version
66 * @param versionOrder list of version in order for connection protocol negotiation
67 * @param errorHandler the ErrorHandler
68 * @param handshakeListener the HandshakeListener
69 * @param useVersionBitmap should use negotiation bit map
71 public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
72 ErrorHandler errorHandler, HandshakeListener handshakeListener,
73 boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
74 this.highestVersion = highestVersion;
75 this.versionOrder = versionOrder;
76 this.connectionAdapter = connectionAdapter;
77 this.errorHandler = errorHandler;
78 this.handshakeListener = handshakeListener;
79 this.useVersionBitmap = useVersionBitmap;
80 this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
84 @SuppressWarnings("checkstyle:IllegalCatch")
85 public synchronized void shake(HelloMessage receivedHello) {
87 if (version != null) {
88 // Some switches respond with a second HELLO acknowledging our HELLO
89 // but we've already completed the handshake based on the negotiated
90 // version and have registered this switch.
91 LOG.debug("Hello recieved after handshake already settled ... ignoring.");
95 LOG.trace("handshake STARTED");
96 setActiveXid(ACTIVE_XID);
99 if (receivedHello == null) {
100 // first Hello sending
101 sendHelloMessage(highestVersion, getNextXid());
102 lastProposedVersion = highestVersion;
103 LOG.trace("ret - firstHello+wait");
107 // process the 2. and later hellos
108 Short remoteVersion = receivedHello.getVersion();
109 List<Elements> elements = receivedHello.getElements();
110 setActiveXid(receivedHello.getXid());
111 List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
112 LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
113 remoteVersionBitmap);
115 if (useVersionBitmap && remoteVersionBitmap != null) {
116 // versionBitmap on both sides -> ONE STEP DECISION
117 handleVersionBitmapNegotiation(elements);
119 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
120 handleStepByStepVersionNegotiation(remoteVersion);
122 } catch (Exception ex) {
123 errorHandler.handleException(ex);
124 LOG.trace("ret - shake fail - closing");
125 handshakeListener.onHandshakeFailure();
130 * Handles the version negotiation step by step.
132 * @param remoteVersion remote version
133 * @throws Exception exception
135 @SuppressWarnings("checkstyle:IllegalCatch")
136 private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
137 LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
140 if (lastProposedVersion == null) {
141 // first hello has not been sent yet, send it and either wait for next remote
142 // version or proceed
143 lastProposedVersion = proposeNextVersion(remoteVersion);
144 final Long nextHelloXid = getNextXid();
145 ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
146 Futures.addCallback(helloResult, new FutureCallback<Void>() {
148 public void onSuccess(Void result) {
150 stepByStepVersionSubStep(remoteVersion);
151 } catch (Exception e) {
152 errorHandler.handleException(e);
153 handshakeListener.onHandshakeFailure();
158 public void onFailure(Throwable throwable) {
159 LOG.info("hello sending seriously failed [{}]", nextHelloXid);
160 LOG.trace("detail of hello send problem", throwable);
162 }, MoreExecutors.directExecutor());
164 stepByStepVersionSubStep(remoteVersion);
168 private void stepByStepVersionSubStep(Short remoteVersion) {
169 if (remoteVersion >= lastProposedVersion) {
170 postHandshake(lastProposedVersion, getNextXid());
171 LOG.trace("ret - OK - switch answered with lastProposedVersion");
173 checkNegotiationStalling(remoteVersion);
175 if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
176 // wait for next version
177 LOG.trace("ret - wait");
179 //propose lower version
180 handleLowerVersionProposal(remoteVersion);
186 * Handles a proposal for a lower version.
188 * @param remoteVersion remote version
189 * @throws Exception exception
191 private void handleLowerVersionProposal(Short remoteVersion) {
192 Short proposedVersion;
193 // find the version from header version field
194 proposedVersion = proposeNextVersion(remoteVersion);
195 lastProposedVersion = proposedVersion;
196 sendHelloMessage(proposedVersion, getNextXid());
198 if (!Objects.equals(proposedVersion, remoteVersion)) {
199 LOG.trace("ret - sent+wait");
201 LOG.trace("ret - sent+OK");
202 postHandshake(proposedVersion, getNextXid());
207 * Handles the negotiation of the version bitmap.
209 * @param elements version elements
210 * @throws Exception exception
212 private void handleVersionBitmapNegotiation(List<Elements> elements) {
213 final Short proposedVersion = proposeCommonBitmapVersion(elements);
214 if (lastProposedVersion == null) {
215 // first hello has not been sent yet
216 Long nexHelloXid = getNextXid();
217 ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
218 Futures.addCallback(helloDone, new FutureCallback<Void>() {
220 public void onSuccess(Void result) {
221 LOG.trace("ret - DONE - versionBitmap");
222 postHandshake(proposedVersion, getNextXid());
226 public void onFailure(Throwable throwable) {
229 }, MoreExecutors.directExecutor());
230 LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
232 LOG.trace("ret - DONE - versionBitmap");
233 postHandshake(proposedVersion, getNextXid());
237 private Long getNextXid() {
242 private void setActiveXid(Long xid) {
243 this.activeXid = xid;
247 * Checks negotiation stalling.
249 * @param remoteVersion remove version
251 private void checkNegotiationStalling(Short remoteVersion) {
252 if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
253 throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
255 lastReceivedVersion = remoteVersion;
259 public Short getVersion() {
264 * find common highest supported bitmap version.
266 * @param list bitmap list
267 * @return proposed bitmap value
269 protected Short proposeCommonBitmapVersion(List<Elements> list) {
270 Short supportedHighestVersion = null;
271 if (null != list && 0 != list.size()) {
272 for (Elements element : list) {
273 List<Boolean> bitmap = element.getVersionBitmap();
274 // check for version bitmap
275 for (short bitPos : OFConstants.VERSION_ORDER) {
276 // with all the version it should work.
277 if (bitmap.get(bitPos % Integer.SIZE)) {
278 supportedHighestVersion = bitPos;
284 if (null == supportedHighestVersion) {
285 LOG.trace("versionBitmap: no common version found");
286 throw new IllegalArgumentException("no common version found in versionBitmap");
290 return supportedHighestVersion;
294 * find supported version based on remoteVersion.
296 * @param remoteVersion openflow version supported by remote entity
297 * @return openflow version
299 protected short proposeNextVersion(short remoteVersion) {
300 Short proposal = null;
301 for (short offer : versionOrder) {
302 if (offer <= remoteVersion) {
307 if (proposal == null) {
308 throw new IllegalArgumentException(
309 "no equal or lower version found, unsupported version: " + remoteVersion);
315 * send hello reply without versionBitmap.
317 * @param helloVersion initial hello version for openflow connection negotiation
318 * @param helloXid transaction id
320 private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
323 HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
325 final SettableFuture<Void> resultFtr = SettableFuture.create();
327 LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
328 MessageFactory.digVersions(helloInput.getElements()));
330 Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
332 public void onSuccess(RpcResult<HelloOutput> result) {
333 if (result.isSuccessful()) {
334 LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
335 connectionAdapter.getRemoteAddress());
338 for (RpcError error : result.getErrors()) {
339 LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
340 error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
341 if (error.getCause() != null) {
342 LOG.trace("DETAIL of sending hello failure", error.getCause());
345 resultFtr.cancel(false);
346 handshakeListener.onHandshakeFailure();
351 public void onFailure(Throwable throwable) {
352 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
353 connectionAdapter.getRemoteAddress(), throwable.getMessage());
354 LOG.trace("DETAIL of sending of hello failure:", throwable);
355 resultFtr.cancel(false);
356 handshakeListener.onHandshakeFailure();
358 }, MoreExecutors.directExecutor());
359 LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
365 * after handshake set features, register to session.
367 * @param proposedVersion proposed openflow version
368 * @param xid transaction id
370 protected void postHandshake(final Short proposedVersion, final Long xid) {
372 version = proposedVersion;
374 LOG.debug("version set: {}", proposedVersion);
376 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
377 featuresBuilder.setVersion(version).setXid(xid);
378 LOG.debug("sending feature request for version={} and xid={}", version, xid);
380 Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
381 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
383 public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
384 LOG.trace("features are back");
385 if (rpcFeatures.isSuccessful()) {
386 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
387 connectionAdapter.setDatapathId(featureOutput.getDatapathId());
388 if (!deviceConnectionRateLimiter.tryAquire()) {
389 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
390 + " the connection from device {}", featureOutput.getDatapathId());
391 connectionAdapter.disconnect();
395 LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
396 LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
397 LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
398 version, featureOutput.getDatapathId(),
399 featureOutput.getAuxiliaryId());
400 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
403 LOG.warn("issuing disconnect during handshake [{}]",
404 connectionAdapter.getRemoteAddress());
405 for (RpcError rpcError : rpcFeatures.getErrors()) {
406 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
407 rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
408 rpcError.getCause());
410 handshakeListener.onHandshakeFailure();
413 LOG.debug("postHandshake DONE");
417 public void onFailure(Throwable throwable) {
418 LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
419 connectionAdapter.getRemoteAddress(), throwable.getMessage());
420 LOG.trace("DETAIL of sending of hello failure:", throwable);
422 }, MoreExecutors.directExecutor());
423 LOG.debug("future features [{}] hooked ..", xid);
427 * Method for unit testing, only.
428 * This method is not thread safe and can only safely be used from a test.
431 @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
432 void setUseVersionBitmap(boolean useVersionBitmap) {
433 this.useVersionBitmap = useVersionBitmap;