2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core;
11 import java.util.List;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
17 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
18 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
20 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
41 import org.opendaylight.yangtools.yang.common.RpcError;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import com.google.common.util.concurrent.Futures;
51 public class ConnectionConductorImpl implements OpenflowProtocolListener,
52 SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
54 protected static final Logger LOG = LoggerFactory
55 .getLogger(ConnectionConductorImpl.class);
57 /* variable to make BitMap-based negotiation enabled / disabled.
58 * it will help while testing and isolating issues related to processing of
59 * BitMaps from switches.
61 private static final boolean isBitmapNegotiationEnable = true;
62 private ErrorHandler errorHandler;
64 private final ConnectionAdapter connectionAdapter;
65 private ConnectionConductor.CONDUCTOR_STATE conductorState;
66 private Short version;
68 private SwitchConnectionDistinguisher auxiliaryKey;
70 private SessionContext sessionContext;
72 protected boolean isFirstHelloNegotiation = true;
74 // TODO: use appropriate interface instead of Object
75 private QueueKeeper<Object> queueKeeper;
80 * @param connectionAdapter
82 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
83 this.connectionAdapter = connectionAdapter;
84 conductorState = CONDUCTOR_STATE.HANDSHAKING;
89 connectionAdapter.setMessageListener(this);
90 connectionAdapter.setSystemListener(this);
91 connectionAdapter.setConnectionReadyListener(this);
95 * @param queueKeeper the queueKeeper to set
98 public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
99 this.queueKeeper = queueKeeper;
103 * @param errorHandler the errorHandler to set
106 public void setErrorHandler(ErrorHandler errorHandler) {
107 this.errorHandler = errorHandler;
111 * send first hello message to switch
113 protected void sendFirstHelloMessage() {
114 Short highestVersion = ConnectionConductor.versionOrder.get(0);
116 HelloInput helloInput = null;
118 if (isBitmapNegotiationEnable) {
119 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
120 LOG.debug("sending first hello message: vertsion header={} , version bitmap={}",
121 highestVersion, MessageFactory.digVersions(helloInput.getElements()));
123 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
124 LOG.debug("sending first hello message: version header={} ", highestVersion);
128 RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
129 smokeRpc(helloResult);
130 LOG.debug("FIRST HELLO sent.");
131 } catch (Throwable e) {
132 LOG.debug("FIRST HELLO sending failed.");
133 errorHandler.handleException(e, getSessionContext());
138 public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
139 new Thread(new Runnable() {
142 LOG.debug("echo request received: " + echoRequestMessage.getXid());
143 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
144 builder.setVersion(echoRequestMessage.getVersion());
145 builder.setXid(echoRequestMessage.getXid());
146 builder.setData(echoRequestMessage.getData());
148 getConnectionAdapter().echoReply(builder.build());
154 public void onErrorMessage(ErrorMessage errorMessage) {
155 queueKeeper.push(ErrorMessage.class, errorMessage, this);
159 public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
160 queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
164 public void onFlowRemovedMessage(FlowRemovedMessage message) {
165 queueKeeper.push(FlowRemovedMessage.class, message, this);
170 * version negotiation happened as per following steps:
171 * 1. If HelloMessage version field has same version, continue connection processing.
172 * If HelloMessage version is lower than supported versions, just disconnect.
173 * 2. If HelloMessage contains bitmap and common version found in bitmap
174 * then continue connection processing. if no common version found, just disconnect.
175 * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
176 * If Hello message received again with not supported version, just disconnect.
178 * TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
181 public void onHelloMessage(final HelloMessage hello) {
184 new Thread(new Runnable() {
187 LOG.info("handshake STARTED");
188 checkState(CONDUCTOR_STATE.HANDSHAKING);
190 Short remoteVersion = hello.getVersion();
191 List<Elements> elements = hello.getElements();
192 Long xid = hello.getXid();
193 Short proposedVersion;
194 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, MessageFactory.digVersions(elements));
196 // find the version from header version field
197 proposedVersion = proposeVersion(remoteVersion);
199 } catch (IllegalArgumentException e) {
200 errorHandler.handleException(e, getSessionContext());
201 getConnectionAdapter().disconnect();
205 // sent version is equal to remote --> version is negotiated
206 if (proposedVersion == remoteVersion) {
207 LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
208 sendHelloReply(proposedVersion, ++xid);
209 postHandshake(proposedVersion, ++xid);
211 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
213 // hello contains version bitmap, checking highest common
215 proposedVersion = proposeBitmapVersion(elements);
216 } catch (IllegalArgumentException ex) {
217 errorHandler.handleException(ex, getSessionContext());
218 getConnectionAdapter().disconnect();
221 LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
222 sendHelloReply(proposedVersion, ++xid);
223 postHandshake(proposedVersion, ++xid);
225 if (isFirstHelloNegotiation) {
226 isFirstHelloNegotiation = false;
227 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
228 // send hello reply with lower version number supported
229 sendHelloReply(proposedVersion, ++xid);
231 // terminate the connection.
232 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
233 getConnectionAdapter().disconnect();
242 * @param proposedVersion
245 protected void sendHelloReply(Short proposedVersion, Long xid)
247 HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
248 RpcResult<Void> result;
250 result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
252 } catch (Throwable e) {
253 errorHandler.handleException(e, getSessionContext());
259 * @param futureResult
262 private static void smokeRpc(RpcResult<?> result) throws Throwable {
263 if (!result.isSuccessful()) {
264 Throwable firstCause = null;
265 StringBuffer sb = new StringBuffer();
266 for (RpcError error : result.getErrors()) {
267 if (firstCause != null) {
268 firstCause = error.getCause();
271 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
273 throw new Exception(sb.toString(), firstCause);
278 * after handshake set features, register to session
279 * @param proposedVersion
282 protected void postHandshake(Short proposedVersion, Long xid) {
284 version = proposedVersion;
285 LOG.debug("version set: " + proposedVersion);
287 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
288 featuresBuilder.setVersion(version).setXid(xid);
289 LOG.debug("sending feature request for version={} and xid={}", version, xid);
290 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
291 .getFeatures(featuresBuilder.build());
292 LOG.debug("waiting for features");
294 RpcResult<GetFeaturesOutput> rpcFeatures =
295 featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
296 smokeRpc(rpcFeatures);
298 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
299 LOG.debug("obtained features: datapathId={}",
300 featureOutput.getDatapathId());
301 LOG.debug("obtained features: auxiliaryId={}",
302 featureOutput.getAuxiliaryId());
303 conductorState = CONDUCTOR_STATE.WORKING;
305 OFSessionUtil.registerSession(this,
306 featureOutput, version);
307 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
308 } catch (Throwable e) {
310 LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
311 errorHandler.handleException(e, getSessionContext());
317 * @return rpc-response timeout in [ms]
319 protected long getMaxTimeout() {
320 // TODO:: get from configuration
325 * @return milliseconds
327 protected TimeUnit getMaxTimeoutUnit() {
328 // TODO:: get from configuration
329 return TimeUnit.MILLISECONDS;
333 public void onMultipartReplyMessage(MultipartReplyMessage message) {
334 queueKeeper.push(MultipartReplyMessage.class, message, this);
338 public void onMultipartRequestMessage(MultipartRequestMessage message) {
339 queueKeeper.push(MultipartRequestMessage.class, message, this);
343 public void onPacketInMessage(PacketInMessage message) {
344 queueKeeper.push(PacketInMessage.class, message, this);
348 public void onPortStatusMessage(PortStatusMessage message) {
349 this.getSessionContext().processPortStatusMsg(message);
350 queueKeeper.push(PortStatusMessage.class, message, this);
354 public void onSwitchIdleEvent(SwitchIdleEvent notification) {
355 new Thread(new Runnable() {
358 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
359 // idle state in any other conductorState than WORKING means real
360 // problem and wont be handled by echoReply, but disconnection
362 OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
364 LOG.debug("first idle state occured");
365 EchoInputBuilder builder = new EchoInputBuilder();
366 builder.setVersion(getVersion());
367 builder.setXid(getSessionContext().getNextXid());
369 Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
370 .echo(builder.build());
373 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
374 getMaxTimeoutUnit());
375 if (echoReplyValue.isSuccessful()) {
376 setConductorState(CONDUCTOR_STATE.WORKING);
378 for (RpcError replyError : echoReplyValue.getErrors()) {
379 Throwable cause = replyError.getCause();
381 "while receiving echoReply in TIMEOUTING state: "
382 + cause.getMessage(), cause);
384 //switch issue occurred
385 throw new Exception("switch issue occurred");
387 } catch (Exception e) {
388 LOG.error("while waiting for echoReply in TIMEOUTING state: "
389 + e.getMessage(), e);
390 //switch is not responding
392 OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
401 * @param conductorState
402 * the connectionState to set
405 public void setConductorState(CONDUCTOR_STATE conductorState) {
406 this.conductorState = conductorState;
410 public CONDUCTOR_STATE getConductorState() {
411 return conductorState;
417 protected void checkState(CONDUCTOR_STATE expectedState) {
418 if (!conductorState.equals(expectedState)) {
419 throw new IllegalStateException("Expected state: " + expectedState
420 + ", actual state:" + conductorState);
425 public void onDisconnectEvent(DisconnectEvent arg0) {
426 SessionManager sessionManager = OFSessionUtil.getSessionManager();
427 sessionManager.invalidateOnDisconnect(this);
431 * find supported version based on remoteVersion
432 * @param remoteVersion
435 protected short proposeVersion(short remoteVersion) {
436 Short proposal = null;
437 for (short offer : ConnectionConductor.versionOrder) {
438 if (offer <= remoteVersion) {
443 if (proposal == null) {
444 throw new IllegalArgumentException("unsupported version: "
451 * find common highest supported bitmap version
455 protected Short proposeBitmapVersion(List<Elements> list)
457 Short supportedHighestVersion = null;
458 if((null != list) && (0 != list.size()))
460 for(Elements element : list)
462 List<Boolean> bitmap = element.getVersionBitmap();
463 // check for version bitmap
464 for(short bitPos : ConnectionConductor.versionOrder)
466 // with all the version it should work.
467 if(bitmap.get(bitPos % Integer.SIZE))
469 supportedHighestVersion = bitPos;
474 if(null == supportedHighestVersion)
476 throw new IllegalArgumentException("unsupported bitmap version.");
481 return supportedHighestVersion;
485 public Short getVersion() {
490 public Future<Boolean> disconnect() {
491 LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
493 Future<Boolean> result = null;
494 if (connectionAdapter.isAlive()) {
495 result = connectionAdapter.disconnect();
497 LOG.debug("connection already disconnected");
498 result = Futures.immediateFuture(true);
505 public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
506 this.auxiliaryKey = auxiliaryKey;
510 public void setSessionContext(SessionContext sessionContext) {
511 this.sessionContext = sessionContext;
515 public SwitchConnectionDistinguisher getAuxiliaryKey() {
520 public SessionContext getSessionContext() {
521 return sessionContext;
525 public ConnectionAdapter getConnectionAdapter() {
526 return connectionAdapter;
530 public void onConnectionReady() {
531 LOG.debug("connection is ready-to-use");
532 new Thread(new Runnable() {
535 sendFirstHelloMessage();