2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
29 import org.opendaylight.yangtools.yang.common.RpcError;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
38 public class HandshakeManagerImpl implements HandshakeManager {
40 private static final Logger LOG = LoggerFactory
41 .getLogger(HandshakeManagerImpl.class);
43 private Short lastProposedVersion;
44 private Short lastReceivedVersion;
45 private final List<Short> versionOrder;
47 //private HelloMessage receivedHello;
48 private final ConnectionAdapter connectionAdapter;
49 private Short version;
50 private ErrorHandler errorHandler;
52 private long maxTimeout = 8000;
53 private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
54 private Short highestVersion;
56 private Long activeXid;
58 private HandshakeListener handshakeListener;
60 private boolean useVersionBitmap;
63 * @param connectionAdapter
64 * @param highestVersion
67 public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
68 List<Short> versionOrder) {
69 this.highestVersion = highestVersion;
70 this.versionOrder = versionOrder;
71 this.connectionAdapter = connectionAdapter;
75 public void setHandshakeListener(HandshakeListener handshakeListener) {
76 this.handshakeListener = handshakeListener;
80 public synchronized void shake(HelloMessage receivedHello) {
82 if (version != null) {
83 // Some switches respond with a second HELLO acknowledging our HELLO
84 // but we've already completed the handshake based on the negotiated
85 // version and have registered this switch.
86 LOG.debug("Hello recieved after handshake already settled ... ignoring.");
90 LOG.trace("handshake STARTED");
94 if (receivedHello == null) {
95 // first Hello sending
96 sendHelloMessage(highestVersion, getNextXid());
97 lastProposedVersion = highestVersion;
98 LOG.trace("ret - firstHello+wait");
102 // process the 2. and later hellos
103 Short remoteVersion = receivedHello.getVersion();
104 List<Elements> elements = receivedHello.getElements();
105 setActiveXid(receivedHello.getXid());
106 List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
107 LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
108 receivedHello.getXid(), remoteVersionBitmap);
110 if (useVersionBitmap && remoteVersionBitmap != null) {
111 // versionBitmap on both sides -> ONE STEP DECISION
112 handleVersionBitmapNegotiation(elements);
114 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
115 handleStepByStepVersionNegotiation(remoteVersion);
117 } catch (Exception ex) {
118 errorHandler.handleException(ex, null);
119 LOG.trace("ret - shake fail - closing");
120 handshakeListener.onHandshakeFailure();
125 * @param remoteVersion
128 private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
129 LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
130 remoteVersion, lastProposedVersion, highestVersion);
132 if (lastProposedVersion == null) {
133 // first hello has not been sent yet, send it and either wait for next remote
134 // version or proceed
135 lastProposedVersion = proposeNextVersion(remoteVersion);
136 final Long nextHelloXid = getNextXid();
137 ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
138 Futures.addCallback(helloResult, new FutureCallback<Void>() {
140 public void onSuccess(Void result) {
142 stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
143 } catch (Exception e) {
144 errorHandler.handleException(e, null);
145 handshakeListener.onHandshakeFailure();
150 public void onFailure(Throwable t) {
151 LOG.info("hello sending seriously failed [{}]", nextHelloXid);
152 LOG.trace("detail of hello send problem", t);
156 stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
160 private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
161 if (remoteVersion == lastProposedVersion) {
162 postHandshake(lastProposedVersion, getNextXid());
163 LOG.trace("ret - OK - switch answered with lastProposedVersion");
165 checkNegotiationStalling(remoteVersion);
167 if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
168 // wait for next version
169 LOG.trace("ret - wait");
171 //propose lower version
172 handleLowerVersionProposal(remoteVersion);
178 * @param remoteVersion
181 private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
182 Short proposedVersion;
183 // find the version from header version field
184 proposedVersion = proposeNextVersion(remoteVersion);
185 lastProposedVersion = proposedVersion;
186 sendHelloMessage(proposedVersion, getNextXid());
188 if (! Objects.equals(proposedVersion, remoteVersion)) {
189 LOG.trace("ret - sent+wait");
191 LOG.trace("ret - sent+OK");
192 postHandshake(proposedVersion, getNextXid());
200 private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
201 final Short proposedVersion = proposeCommonBitmapVersion(elements);
202 if (lastProposedVersion == null) {
203 // first hello has not been sent yet
204 Long nexHelloXid = getNextXid();
205 ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
206 Futures.addCallback(helloDone, new FutureCallback<Void>() {
208 public void onSuccess(Void result) {
209 LOG.trace("ret - DONE - versionBitmap");
210 postHandshake(proposedVersion, getNextXid());
214 public void onFailure(Throwable t) {
218 LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
220 LOG.trace("ret - DONE - versionBitmap");
221 postHandshake(proposedVersion, getNextXid());
229 private Long getNextXid() {
237 private void setActiveXid(Long xid) {
238 this.activeXid = xid;
242 * @param remoteVersion
244 private void checkNegotiationStalling(Short remoteVersion) {
245 if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
246 throw new IllegalStateException("version negotiation stalled: version = "+remoteVersion);
248 lastReceivedVersion = remoteVersion;
252 public Short getVersion() {
257 * find common highest supported bitmap version
261 protected Short proposeCommonBitmapVersion(List<Elements> list) {
262 Short supportedHighestVersion = null;
263 if((null != list) && (0 != list.size())) {
264 for(Elements element : list) {
265 List<Boolean> bitmap = element.getVersionBitmap();
266 // check for version bitmap
267 for(short bitPos : ConnectionConductor.versionOrder) {
268 // with all the version it should work.
269 if(bitmap.get(bitPos % Integer.SIZE)) {
270 supportedHighestVersion = bitPos;
276 if(null == supportedHighestVersion) {
277 LOG.trace("versionBitmap: no common version found");
278 throw new IllegalArgumentException("no common version found in versionBitmap");
282 return supportedHighestVersion;
286 * find supported version based on remoteVersion
287 * @param remoteVersion
290 protected short proposeNextVersion(short remoteVersion) {
291 Short proposal = null;
292 for (short offer : versionOrder) {
293 if (offer <= remoteVersion) {
298 if (proposal == null) {
299 throw new IllegalArgumentException("no equal or lower version found, unsupported version: "
306 * send hello reply without versionBitmap
307 * @param helloVersion
311 private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
312 //Short highestVersion = ConnectionConductor.versionOrder.get(0);
313 //final Long helloXid = 21L;
314 HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
316 final SettableFuture<Void> resultFtr = SettableFuture.create();
318 LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
319 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
321 Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
323 ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
324 Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
326 public void onSuccess(RpcResult<Void> result) {
327 if (result.isSuccessful()) {
328 LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
331 for (RpcError error : result.getErrors()) {
332 LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
333 error.getInfo(), error.getSeverity(), error.getMessage(),
334 connectionAdapter.getRemoteAddress());
335 if (error.getCause() != null) {
336 LOG.trace("DETAIL of sending hello failure", error.getCause());
339 resultFtr.cancel(false);
340 handshakeListener.onHandshakeFailure();
345 public void onFailure(Throwable t) {
346 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
347 connectionAdapter.getRemoteAddress(), t.getMessage());
348 LOG.trace("DETAIL of sending of hello failure:", t);
349 resultFtr.cancel(false);
350 handshakeListener.onHandshakeFailure();
353 LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
359 * after handshake set features, register to session
360 * @param proposedVersion
363 protected void postHandshake(final Short proposedVersion, final Long xid) {
365 version = proposedVersion;
367 LOG.debug("version set: {}", proposedVersion);
369 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
370 featuresBuilder.setVersion(version).setXid(xid);
371 LOG.debug("sending feature request for version={} and xid={}", version, xid);
372 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
373 .getFeatures(featuresBuilder.build());
375 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
376 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
378 public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
379 LOG.trace("features are back");
380 if (rpcFeatures.isSuccessful()) {
381 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
383 LOG.debug("obtained features: datapathId={}",
384 featureOutput.getDatapathId());
385 LOG.debug("obtained features: auxiliaryId={}",
386 featureOutput.getAuxiliaryId());
387 LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
388 version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
389 handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
392 LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
393 for (RpcError rpcError : rpcFeatures.getErrors()) {
394 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
395 rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
399 handshakeListener.onHandshakeFailure();
402 LOG.debug("postHandshake DONE");
406 public void onFailure(Throwable t) {
407 LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
408 connectionAdapter.getRemoteAddress(), t.getMessage());
409 LOG.trace("DETAIL of sending of hello failure:", t);
413 LOG.debug("future features [{}] hooked ..", xid);
418 public void setUseVersionBitmap(boolean useVersionBitmap) {
419 this.useVersionBitmap = useVersionBitmap;
423 public void setErrorHandler(ErrorHandler errorHandler) {
424 this.errorHandler = errorHandler;