2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.connection;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.Future;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
23 import org.opendaylight.openflowplugin.api.OFConstants;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
27 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
28 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 public class HandshakeManagerImpl implements HandshakeManager {
42 private static final long ACTIVE_XID = 20L;
44 private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
46 private Short lastProposedVersion;
47 private Short lastReceivedVersion;
48 private final List<Short> versionOrder;
50 private final ConnectionAdapter connectionAdapter;
51 private Short version;
52 private final ErrorHandler errorHandler;
54 private final Short highestVersion;
56 private Long activeXid;
58 private final HandshakeListener handshakeListener;
60 private boolean useVersionBitmap; // not final just for unit test
62 private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
67 * @param connectionAdapter connection adaptor for switch
68 * @param highestVersion highest openflow version
69 * @param versionOrder list of version in order for connection protocol negotiation
70 * @param errorHandler the ErrorHandler
71 * @param handshakeListener the HandshakeListener
72 * @param useVersionBitmap should use negotiation bit map
74 public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
75 ErrorHandler errorHandler, HandshakeListener handshakeListener,
76 boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
77 this.highestVersion = highestVersion;
78 this.versionOrder = versionOrder;
79 this.connectionAdapter = connectionAdapter;
80 this.errorHandler = errorHandler;
81 this.handshakeListener = handshakeListener;
82 this.useVersionBitmap = useVersionBitmap;
83 this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
87 @SuppressWarnings("checkstyle:IllegalCatch")
88 public synchronized void shake(HelloMessage receivedHello) {
90 if (version != null) {
91 // Some switches respond with a second HELLO acknowledging our HELLO
92 // but we've already completed the handshake based on the negotiated
93 // version and have registered this switch.
94 LOG.debug("Hello recieved after handshake already settled ... ignoring.");
98 LOG.trace("handshake STARTED");
99 setActiveXid(ACTIVE_XID);
102 if (receivedHello == null) {
103 // first Hello sending
104 sendHelloMessage(highestVersion, getNextXid());
105 lastProposedVersion = highestVersion;
106 LOG.trace("ret - firstHello+wait");
110 // process the 2. and later hellos
111 Short remoteVersion = receivedHello.getVersion();
112 List<Elements> elements = receivedHello.getElements();
113 setActiveXid(receivedHello.getXid());
114 List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
115 LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
116 remoteVersionBitmap);
118 if (useVersionBitmap && remoteVersionBitmap != null) {
119 // versionBitmap on both sides -> ONE STEP DECISION
120 handleVersionBitmapNegotiation(elements);
122 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
123 handleStepByStepVersionNegotiation(remoteVersion);
125 } catch (Exception ex) {
126 errorHandler.handleException(ex);
127 LOG.trace("ret - shake fail - closing");
128 handshakeListener.onHandshakeFailure();
133 * Handles the version negotiation step by step.
135 * @param remoteVersion remote version
136 * @throws Exception exception
138 @SuppressWarnings("checkstyle:IllegalCatch")
139 private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
140 LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
143 if (lastProposedVersion == null) {
144 // first hello has not been sent yet, send it and either wait for next remote
145 // version or proceed
146 lastProposedVersion = proposeNextVersion(remoteVersion);
147 final Long nextHelloXid = getNextXid();
148 ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
149 Futures.addCallback(helloResult, new FutureCallback<Void>() {
151 public void onSuccess(Void result) {
153 stepByStepVersionSubStep(remoteVersion);
154 } catch (Exception e) {
155 errorHandler.handleException(e);
156 handshakeListener.onHandshakeFailure();
161 public void onFailure(Throwable throwable) {
162 LOG.info("hello sending seriously failed [{}]", nextHelloXid);
163 LOG.trace("detail of hello send problem", throwable);
165 }, MoreExecutors.directExecutor());
167 stepByStepVersionSubStep(remoteVersion);
171 private void stepByStepVersionSubStep(Short remoteVersion) throws Exception {
172 if (remoteVersion.equals(lastProposedVersion)) {
173 postHandshake(lastProposedVersion, getNextXid());
174 LOG.trace("ret - OK - switch answered with lastProposedVersion");
176 checkNegotiationStalling(remoteVersion);
178 if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
179 // wait for next version
180 LOG.trace("ret - wait");
182 //propose lower version
183 handleLowerVersionProposal(remoteVersion);
189 * Handles a proposal for a lower version.
191 * @param remoteVersion remote version
192 * @throws Exception exception
194 private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
195 Short proposedVersion;
196 // find the version from header version field
197 proposedVersion = proposeNextVersion(remoteVersion);
198 lastProposedVersion = proposedVersion;
199 sendHelloMessage(proposedVersion, getNextXid());
201 if (!Objects.equals(proposedVersion, remoteVersion)) {
202 LOG.trace("ret - sent+wait");
204 LOG.trace("ret - sent+OK");
205 postHandshake(proposedVersion, getNextXid());
210 * Handles the negotiation of the version bitmap.
212 * @param elements version elements
213 * @throws Exception exception
215 private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
216 final Short proposedVersion = proposeCommonBitmapVersion(elements);
217 if (lastProposedVersion == null) {
218 // first hello has not been sent yet
219 Long nexHelloXid = getNextXid();
220 ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
221 Futures.addCallback(helloDone, new FutureCallback<Void>() {
223 public void onSuccess(Void result) {
224 LOG.trace("ret - DONE - versionBitmap");
225 postHandshake(proposedVersion, getNextXid());
229 public void onFailure(Throwable throwable) {
232 }, MoreExecutors.directExecutor());
233 LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
235 LOG.trace("ret - DONE - versionBitmap");
236 postHandshake(proposedVersion, getNextXid());
240 private Long getNextXid() {
245 private void setActiveXid(Long xid) {
246 this.activeXid = xid;
250 * Checks negotiation stalling.
252 * @param remoteVersion remove version
254 private void checkNegotiationStalling(Short remoteVersion) {
255 if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
256 throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
258 lastReceivedVersion = remoteVersion;
262 public Short getVersion() {
267 * find common highest supported bitmap version.
269 * @param list bitmap list
270 * @return proposed bitmap value
272 protected Short proposeCommonBitmapVersion(List<Elements> list) {
273 Short supportedHighestVersion = null;
274 if (null != list && 0 != list.size()) {
275 for (Elements element : list) {
276 List<Boolean> bitmap = element.getVersionBitmap();
277 // check for version bitmap
278 for (short bitPos : OFConstants.VERSION_ORDER) {
279 // with all the version it should work.
280 if (bitmap.get(bitPos % Integer.SIZE)) {
281 supportedHighestVersion = bitPos;
287 if (null == supportedHighestVersion) {
288 LOG.trace("versionBitmap: no common version found");
289 throw new IllegalArgumentException("no common version found in versionBitmap");
293 return supportedHighestVersion;
297 * find supported version based on remoteVersion.
299 * @param remoteVersion openflow version supported by remote entity
300 * @return openflow version
302 protected short proposeNextVersion(short remoteVersion) {
303 Short proposal = null;
304 for (short offer : versionOrder) {
305 if (offer <= remoteVersion) {
310 if (proposal == null) {
311 throw new IllegalArgumentException(
312 "no equal or lower version found, unsupported version: " + remoteVersion);
318 * send hello reply without versionBitmap.
320 * @param helloVersion initial hello version for openflow connection negotiation
321 * @param helloXid transaction id
323 private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
326 HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
328 final SettableFuture<Void> resultFtr = SettableFuture.create();
330 LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
331 MessageFactory.digVersions(helloInput.getElements()));
333 Future<RpcResult<HelloOutput>> helloResult = connectionAdapter.hello(helloInput);
335 ListenableFuture<RpcResult<HelloOutput>> rpcResultListenableFuture
336 = JdkFutureAdapters.listenInPoolThread(helloResult);
337 Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<HelloOutput>>() {
339 public void onSuccess(@Nonnull RpcResult<HelloOutput> result) {
340 if (result.isSuccessful()) {
341 LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
342 connectionAdapter.getRemoteAddress());
345 for (RpcError error : result.getErrors()) {
346 LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
347 error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
348 if (error.getCause() != null) {
349 LOG.trace("DETAIL of sending hello failure", error.getCause());
352 resultFtr.cancel(false);
353 handshakeListener.onHandshakeFailure();
358 public void onFailure(Throwable throwable) {
359 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
360 connectionAdapter.getRemoteAddress(), throwable.getMessage());
361 LOG.trace("DETAIL of sending of hello failure:", throwable);
362 resultFtr.cancel(false);
363 handshakeListener.onHandshakeFailure();
365 }, MoreExecutors.directExecutor());
366 LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
372 * after handshake set features, register to session.
374 * @param proposedVersion proposed openflow version
375 * @param xid transaction id
377 protected void postHandshake(final Short proposedVersion, final Long xid) {
379 version = proposedVersion;
381 LOG.debug("version set: {}", proposedVersion);
383 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
384 featuresBuilder.setVersion(version).setXid(xid);
385 LOG.debug("sending feature request for version={} and xid={}", version, xid);
386 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
388 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
389 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
391 public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) {
392 LOG.trace("features are back");
393 if (rpcFeatures.isSuccessful()) {
394 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
395 if (!deviceConnectionRateLimiter.tryAquire()) {
396 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
397 + " the connection from device {}", featureOutput.getDatapathId());
398 connectionAdapter.disconnect();
402 LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
403 LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
404 LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
405 version, featureOutput.getDatapathId(),
406 featureOutput.getAuxiliaryId());
407 handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
410 LOG.warn("issuing disconnect during handshake [{}]",
411 connectionAdapter.getRemoteAddress());
412 for (RpcError rpcError : rpcFeatures.getErrors()) {
413 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
414 rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
415 rpcError.getCause());
417 handshakeListener.onHandshakeFailure();
420 LOG.debug("postHandshake DONE");
424 public void onFailure(Throwable throwable) {
425 LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
426 connectionAdapter.getRemoteAddress(), throwable.getMessage());
427 LOG.trace("DETAIL of sending of hello failure:", throwable);
429 }, MoreExecutors.directExecutor());
430 LOG.debug("future features [{}] hooked ..", xid);
434 * Method for unit testing, only.
435 * This method is not thread safe and can only safely be used from a test.
438 @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
439 void setUseVersionBitmap(boolean useVersionBitmap) {
440 this.useVersionBitmap = useVersionBitmap;