2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
24 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import com.google.common.util.concurrent.Futures;
57 public class ConnectionConductorImpl implements OpenflowProtocolListener,
58 SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
60 protected static final Logger LOG = LoggerFactory
61 .getLogger(ConnectionConductorImpl.class);
63 /* variable to make BitMap-based negotiation enabled / disabled.
64 * it will help while testing and isolating issues related to processing of
65 * BitMaps from switches.
67 private static final boolean isBitmapNegotiationEnable = true;
68 private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
70 protected final ConnectionAdapter connectionAdapter;
71 private ConnectionConductor.CONDUCTOR_STATE conductorState;
72 private Short version;
74 private SwitchConnectionDistinguisher auxiliaryKey;
76 private SessionContext sessionContext;
78 private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
80 protected boolean isFirstHelloNegotiation = true;
85 * @param connectionAdapter
87 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
88 this.connectionAdapter = connectionAdapter;
89 conductorState = CONDUCTOR_STATE.HANDSHAKING;
90 new Thread(new ErrorQueueHandler(errorQueue)).start();
95 connectionAdapter.setMessageListener(this);
96 connectionAdapter.setSystemListener(this);
97 connectionAdapter.setConnectionReadyListener(this);
102 * send first hello message to switch
104 protected void sendFirstHelloMessage() {
105 Short highestVersion = ConnectionConductor.versionOrder.get(0);
107 HelloInput helloInput = null;
109 if (isBitmapNegotiationEnable) {
110 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
111 LOG.debug("sending first hello message: vertsion header={} , version bitmap={}",
112 highestVersion, helloInput.getElements());
114 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
115 LOG.debug("sending first hello message: version header={} ", highestVersion);
119 RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
120 smokeRpc(helloResult);
121 LOG.debug("FIRST HELLO sent.");
122 } catch (Throwable e) {
123 LOG.debug("FIRST HELLO sending failed.");
129 public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
130 new Thread(new Runnable() {
133 LOG.debug("echo request received: " + echoRequestMessage.getXid());
134 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
135 builder.setVersion(echoRequestMessage.getVersion());
136 builder.setXid(echoRequestMessage.getXid());
137 builder.setData(echoRequestMessage.getData());
139 connectionAdapter.echoReply(builder.build());
145 public void onErrorMessage(ErrorMessage errorMessage) {
146 // TODO Auto-generated method stub
147 LOG.debug("error received, type: " + errorMessage.getType()
148 + "; code: " + errorMessage.getCode());
152 public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
153 LOG.debug("experimenter received, type: "
154 + experimenterMessage.getExpType());
155 notifyListeners(ExperimenterMessage.class, experimenterMessage);
159 public void onFlowRemovedMessage(FlowRemovedMessage message) {
160 notifyListeners(FlowRemovedMessage.class, message);
165 * version negotiation happened as per following steps:
166 * 1. If HelloMessage version field has same version, continue connection processing.
167 * If HelloMessage version is lower than supported versions, just disconnect.
168 * 2. If HelloMessage contains bitmap and common version found in bitmap
169 * then continue connection processing. if no common version found, just disconnect.
170 * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
171 * If Hello message received again with not supported version, just disconnect.
173 * TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
176 public void onHelloMessage(final HelloMessage hello) {
179 new Thread(new Runnable() {
182 LOG.info("handshake STARTED");
183 checkState(CONDUCTOR_STATE.HANDSHAKING);
185 Short remoteVersion = hello.getVersion();
186 List<Elements> elements = hello.getElements();
187 Long xid = hello.getXid();
188 Short proposedVersion;
189 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
191 // find the version from header version field
192 proposedVersion = proposeVersion(remoteVersion);
194 } catch (IllegalArgumentException e) {
196 connectionAdapter.disconnect();
200 // sent version is equal to remote --> version is negotiated
201 if (proposedVersion == remoteVersion) {
202 LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
203 sendHelloReply(proposedVersion, ++xid);
204 postHandshake(proposedVersion, ++xid);
206 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
208 // hello contains version bitmap, checking highest common
210 proposedVersion = proposeBitmapVersion(elements);
211 } catch (IllegalArgumentException ex) {
213 connectionAdapter.disconnect();
216 LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
217 sendHelloReply(proposedVersion, ++xid);
218 postHandshake(proposedVersion, ++xid);
220 if (isFirstHelloNegotiation) {
221 isFirstHelloNegotiation = false;
222 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
223 // send hello reply with lower version number supported
224 sendHelloReply(proposedVersion, ++xid);
226 // terminate the connection.
227 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
228 connectionAdapter.disconnect();
237 * @param proposedVersion
240 protected void sendHelloReply(Short proposedVersion, Long xid)
242 HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
243 RpcResult<Void> result;
245 result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
247 } catch (Throwable e) {
254 * @param futureResult
257 private static void smokeRpc(RpcResult<?> result) throws Throwable {
258 if (!result.isSuccessful()) {
259 Throwable firstCause = null;
260 StringBuffer sb = new StringBuffer();
261 for (RpcError error : result.getErrors()) {
262 if (firstCause != null) {
263 firstCause = error.getCause();
266 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
268 throw new Exception(sb.toString(), firstCause);
273 * after handshake set features, register to session
274 * @param proposedVersion
277 protected void postHandshake(Short proposedVersion, Long xid) {
279 version = proposedVersion;
280 LOG.debug("version set: " + proposedVersion);
282 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
283 featuresBuilder.setVersion(version).setXid(xid);
284 LOG.debug("sending feature request for version={} and xid={}", version, xid);
285 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
286 .getFeatures(featuresBuilder.build());
287 LOG.debug("waiting for features");
289 RpcResult<GetFeaturesOutput> rpcFeatures =
290 featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
291 smokeRpc(rpcFeatures);
293 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
294 LOG.debug("obtained features: datapathId={}",
295 featureOutput.getDatapathId());
296 LOG.debug("obtained features: auxiliaryId={}",
297 featureOutput.getAuxiliaryId());
298 conductorState = CONDUCTOR_STATE.WORKING;
300 OFSessionUtil.registerSession(this,
301 featureOutput, version);
302 this.setListenerMapping(OFSessionUtil.getListenersMap());
303 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
304 } catch (Throwable e) {
306 LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
313 * @return rpc-response timeout in [ms]
315 private long getMaxTimeout() {
316 // TODO:: get from configuration
321 * @return milliseconds
323 private TimeUnit getMaxTimeoutUnit() {
324 // TODO:: get from configuration
325 return TimeUnit.MILLISECONDS;
332 protected void handleException(Throwable e) {
333 String sessionKeyId = null;
334 if (getSessionContext() != null) {
335 sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
338 Exception causeAndThread = new Exception(
339 "IN THREAD: "+Thread.currentThread().getName() +
340 "; session:"+sessionKeyId, e);
342 errorQueue.put(causeAndThread);
343 } catch (InterruptedException e1) {
344 LOG.error(e1.getMessage(), e1);
349 public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
350 // TODO Auto-generated method stub
354 public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
355 // TODO Auto-generated method stub
359 public void onPacketInMessage(PacketInMessage message) {
360 notifyListeners(PacketInMessage.class, message);
364 public void onPortStatusMessage(PortStatusMessage message) {
365 this.getSessionContext().processPortStatusMsg(message);
366 notifyListeners(PortStatusMessage.class, message);
370 public void onSwitchIdleEvent(SwitchIdleEvent notification) {
371 if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
372 // idle state in any other conductorState than WORKING means real
373 // problem and wont be handled by echoReply, but disconnection
375 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
377 LOG.debug("first idle state occured");
378 EchoInputBuilder builder = new EchoInputBuilder();
379 builder.setVersion(version);
380 // TODO: get xid from sessionContext
383 Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
384 .echo(builder.build());
387 // TODO: read timeout from config
388 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
389 getMaxTimeoutUnit());
390 if (echoReplyValue.isSuccessful()) {
391 conductorState = CONDUCTOR_STATE.WORKING;
393 for (RpcError replyError : echoReplyValue.getErrors()) {
394 Throwable cause = replyError.getCause();
396 "while receiving echoReply in TIMEOUTING state: "
397 + cause.getMessage(), cause);
399 //switch issue occurred
400 throw new Exception("switch issue occurred");
402 } catch (Exception e) {
403 LOG.error("while waiting for echoReply in TIMEOUTING state: "
404 + e.getMessage(), e);
405 //switch is not responding
407 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
413 * @param conductorState
414 * the connectionState to set
417 public void setConductorState(CONDUCTOR_STATE conductorState) {
418 this.conductorState = conductorState;
422 public CONDUCTOR_STATE getConductorState() {
423 return conductorState;
429 protected void checkState(CONDUCTOR_STATE expectedState) {
430 if (!conductorState.equals(expectedState)) {
431 throw new IllegalStateException("Expected state: " + expectedState
432 + ", actual state:" + conductorState);
437 public void onDisconnectEvent(DisconnectEvent arg0) {
438 SessionManager sessionManager = OFSessionUtil.getSessionManager();
439 sessionManager.invalidateOnDisconnect(this);
443 * find supported version based on remoteVersion
444 * @param remoteVersion
447 protected short proposeVersion(short remoteVersion) {
448 Short proposal = null;
449 for (short offer : ConnectionConductor.versionOrder) {
450 if (offer <= remoteVersion) {
455 if (proposal == null) {
456 throw new IllegalArgumentException("unsupported version: "
463 * find common highest supported bitmap version
467 protected Short proposeBitmapVersion(List<Elements> list)
469 Short supportedHighestVersion = null;
470 if((null != list) && (0 != list.size()))
472 for(Elements element : list)
474 List<Boolean> bitmap = element.getVersionBitmap();
475 // check for version bitmap
476 for(short bitPos : ConnectionConductor.versionOrder)
478 // with all the version it should work.
479 if(bitmap.get(bitPos % Integer.SIZE))
481 supportedHighestVersion = bitPos;
486 if(null == supportedHighestVersion)
488 throw new IllegalArgumentException("unsupported bitmap version.");
493 return supportedHighestVersion;
497 public Short getVersion() {
502 public Future<Boolean> disconnect() {
503 LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
505 Future<Boolean> result = null;
506 if (connectionAdapter.isAlive()) {
507 result = connectionAdapter.disconnect();
509 LOG.debug("connection already disconnected");
510 result = Futures.immediateFuture(true);
517 public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
518 this.auxiliaryKey = auxiliaryKey;
522 public void setSessionContext(SessionContext sessionContext) {
523 this.sessionContext = sessionContext;
527 public SwitchConnectionDistinguisher getAuxiliaryKey() {
532 public SessionContext getSessionContext() {
533 return sessionContext;
537 * @param listenerMapping the listenerMapping to set
539 public void setListenerMapping(
540 Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
541 this.listenerMapping = listenerMapping;
548 private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
549 Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
550 if (listeners != null) {
551 for (IMDMessageListener listener : listeners) {
552 // Pass cookie only for PACKT_IN
553 if ( messageType.equals("PacketInMessage.class")){
554 listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
556 listener.receive(null, this.getSessionContext(), message);
560 LOG.warn("No listeners for this message Type {}", messageType);
565 public ConnectionAdapter getConnectionAdapter() {
566 return connectionAdapter;
570 public void onConnectionReady() {
571 LOG.debug("connection is ready-to-use");
572 //TODO: fire first helloMessage
573 new Thread(new Runnable() {
576 sendFirstHelloMessage();