2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
24 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
26 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.common.RpcError;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.util.concurrent.Futures;
59 public class ConnectionConductorImpl implements OpenflowProtocolListener,
60 SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
62 protected static final Logger LOG = LoggerFactory
63 .getLogger(ConnectionConductorImpl.class);
65 /* variable to make BitMap-based negotiation enabled / disabled.
66 * it will help while testing and isolating issues related to processing of
67 * BitMaps from switches.
69 private static final boolean isBitmapNegotiationEnable = true;
70 private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
72 protected final ConnectionAdapter connectionAdapter;
73 private ConnectionConductor.CONDUCTOR_STATE conductorState;
74 private Short version;
76 private SwitchConnectionDistinguisher auxiliaryKey;
78 private SessionContext sessionContext;
80 private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
82 protected boolean isFirstHelloNegotiation = true;
84 // TODO: use appropriate interface instead of Object
85 private QueueKeeper<Object> queueKeeper;
90 * @param connectionAdapter
92 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
93 this.connectionAdapter = connectionAdapter;
94 conductorState = CONDUCTOR_STATE.HANDSHAKING;
95 new Thread(new ErrorQueueHandler(errorQueue)).start();
100 connectionAdapter.setMessageListener(this);
101 connectionAdapter.setSystemListener(this);
102 connectionAdapter.setConnectionReadyListener(this);
106 * @param queueKeeper the queueKeeper to set
109 public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
110 this.queueKeeper = queueKeeper;
115 * send first hello message to switch
117 protected void sendFirstHelloMessage() {
118 Short highestVersion = ConnectionConductor.versionOrder.get(0);
120 HelloInput helloInput = null;
122 if (isBitmapNegotiationEnable) {
123 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
124 LOG.debug("sending first hello message: vertsion header={} , version bitmap={}",
125 highestVersion, helloInput.getElements());
127 helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
128 LOG.debug("sending first hello message: version header={} ", highestVersion);
132 RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
133 smokeRpc(helloResult);
134 LOG.debug("FIRST HELLO sent.");
135 } catch (Throwable e) {
136 LOG.debug("FIRST HELLO sending failed.");
142 public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
143 new Thread(new Runnable() {
146 LOG.debug("echo request received: " + echoRequestMessage.getXid());
147 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148 builder.setVersion(echoRequestMessage.getVersion());
149 builder.setXid(echoRequestMessage.getXid());
150 builder.setData(echoRequestMessage.getData());
152 connectionAdapter.echoReply(builder.build());
158 public void onErrorMessage(ErrorMessage errorMessage) {
159 queueKeeper.push(ErrorMessage.class, errorMessage, this);
160 // notifyListeners(ErrorMessage.class, errorMessage);
164 public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
165 queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
166 // notifyListeners(ExperimenterMessage.class, experimenterMessage);
170 public void onFlowRemovedMessage(FlowRemovedMessage message) {
171 notifyListeners(FlowRemovedMessage.class, message);
176 * version negotiation happened as per following steps:
177 * 1. If HelloMessage version field has same version, continue connection processing.
178 * If HelloMessage version is lower than supported versions, just disconnect.
179 * 2. If HelloMessage contains bitmap and common version found in bitmap
180 * then continue connection processing. if no common version found, just disconnect.
181 * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182 * If Hello message received again with not supported version, just disconnect.
184 * TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
187 public void onHelloMessage(final HelloMessage hello) {
190 new Thread(new Runnable() {
193 LOG.info("handshake STARTED");
194 checkState(CONDUCTOR_STATE.HANDSHAKING);
196 Short remoteVersion = hello.getVersion();
197 List<Elements> elements = hello.getElements();
198 Long xid = hello.getXid();
199 Short proposedVersion;
200 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
202 // find the version from header version field
203 proposedVersion = proposeVersion(remoteVersion);
205 } catch (IllegalArgumentException e) {
207 connectionAdapter.disconnect();
211 // sent version is equal to remote --> version is negotiated
212 if (proposedVersion == remoteVersion) {
213 LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
214 sendHelloReply(proposedVersion, ++xid);
215 postHandshake(proposedVersion, ++xid);
217 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
219 // hello contains version bitmap, checking highest common
221 proposedVersion = proposeBitmapVersion(elements);
222 } catch (IllegalArgumentException ex) {
224 connectionAdapter.disconnect();
227 LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
228 sendHelloReply(proposedVersion, ++xid);
229 postHandshake(proposedVersion, ++xid);
231 if (isFirstHelloNegotiation) {
232 isFirstHelloNegotiation = false;
233 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
234 // send hello reply with lower version number supported
235 sendHelloReply(proposedVersion, ++xid);
237 // terminate the connection.
238 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
239 connectionAdapter.disconnect();
248 * @param proposedVersion
251 protected void sendHelloReply(Short proposedVersion, Long xid)
253 HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
254 RpcResult<Void> result;
256 result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
258 } catch (Throwable e) {
265 * @param futureResult
268 private static void smokeRpc(RpcResult<?> result) throws Throwable {
269 if (!result.isSuccessful()) {
270 Throwable firstCause = null;
271 StringBuffer sb = new StringBuffer();
272 for (RpcError error : result.getErrors()) {
273 if (firstCause != null) {
274 firstCause = error.getCause();
277 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
279 throw new Exception(sb.toString(), firstCause);
284 * after handshake set features, register to session
285 * @param proposedVersion
288 protected void postHandshake(Short proposedVersion, Long xid) {
290 version = proposedVersion;
291 LOG.debug("version set: " + proposedVersion);
293 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
294 featuresBuilder.setVersion(version).setXid(xid);
295 LOG.debug("sending feature request for version={} and xid={}", version, xid);
296 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
297 .getFeatures(featuresBuilder.build());
298 LOG.debug("waiting for features");
300 RpcResult<GetFeaturesOutput> rpcFeatures =
301 featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
302 smokeRpc(rpcFeatures);
304 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
305 LOG.debug("obtained features: datapathId={}",
306 featureOutput.getDatapathId());
307 LOG.debug("obtained features: auxiliaryId={}",
308 featureOutput.getAuxiliaryId());
309 conductorState = CONDUCTOR_STATE.WORKING;
311 OFSessionUtil.registerSession(this,
312 featureOutput, version);
313 this.setListenerMapping(OFSessionUtil.getListenersMap());
314 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
315 } catch (Throwable e) {
317 LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
324 * @return rpc-response timeout in [ms]
326 private long getMaxTimeout() {
327 // TODO:: get from configuration
332 * @return milliseconds
334 private TimeUnit getMaxTimeoutUnit() {
335 // TODO:: get from configuration
336 return TimeUnit.MILLISECONDS;
343 protected void handleException(Throwable e) {
344 String sessionKeyId = null;
345 if (getSessionContext() != null) {
346 sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
349 Exception causeAndThread = new Exception(
350 "IN THREAD: "+Thread.currentThread().getName() +
351 "; session:"+sessionKeyId, e);
353 errorQueue.put(causeAndThread);
354 } catch (InterruptedException e1) {
355 LOG.error(e1.getMessage(), e1);
360 public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
361 // TODO Auto-generated method stub
365 public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
366 // TODO Auto-generated method stub
370 public void onPacketInMessage(PacketInMessage message) {
371 notifyListeners(PacketInMessage.class, message);
375 public void onPortStatusMessage(PortStatusMessage message) {
376 this.getSessionContext().processPortStatusMsg(message);
377 notifyListeners(PortStatusMessage.class, message);
381 public void onSwitchIdleEvent(SwitchIdleEvent notification) {
382 if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
383 // idle state in any other conductorState than WORKING means real
384 // problem and wont be handled by echoReply, but disconnection
386 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
388 LOG.debug("first idle state occured");
389 EchoInputBuilder builder = new EchoInputBuilder();
390 builder.setVersion(version);
391 // TODO: get xid from sessionContext
394 Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
395 .echo(builder.build());
398 // TODO: read timeout from config
399 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
400 getMaxTimeoutUnit());
401 if (echoReplyValue.isSuccessful()) {
402 conductorState = CONDUCTOR_STATE.WORKING;
404 for (RpcError replyError : echoReplyValue.getErrors()) {
405 Throwable cause = replyError.getCause();
407 "while receiving echoReply in TIMEOUTING state: "
408 + cause.getMessage(), cause);
410 //switch issue occurred
411 throw new Exception("switch issue occurred");
413 } catch (Exception e) {
414 LOG.error("while waiting for echoReply in TIMEOUTING state: "
415 + e.getMessage(), e);
416 //switch is not responding
418 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
424 * @param conductorState
425 * the connectionState to set
428 public void setConductorState(CONDUCTOR_STATE conductorState) {
429 this.conductorState = conductorState;
433 public CONDUCTOR_STATE getConductorState() {
434 return conductorState;
440 protected void checkState(CONDUCTOR_STATE expectedState) {
441 if (!conductorState.equals(expectedState)) {
442 throw new IllegalStateException("Expected state: " + expectedState
443 + ", actual state:" + conductorState);
448 public void onDisconnectEvent(DisconnectEvent arg0) {
449 SessionManager sessionManager = OFSessionUtil.getSessionManager();
450 sessionManager.invalidateOnDisconnect(this);
454 * find supported version based on remoteVersion
455 * @param remoteVersion
458 protected short proposeVersion(short remoteVersion) {
459 Short proposal = null;
460 for (short offer : ConnectionConductor.versionOrder) {
461 if (offer <= remoteVersion) {
466 if (proposal == null) {
467 throw new IllegalArgumentException("unsupported version: "
474 * find common highest supported bitmap version
478 protected Short proposeBitmapVersion(List<Elements> list)
480 Short supportedHighestVersion = null;
481 if((null != list) && (0 != list.size()))
483 for(Elements element : list)
485 List<Boolean> bitmap = element.getVersionBitmap();
486 // check for version bitmap
487 for(short bitPos : ConnectionConductor.versionOrder)
489 // with all the version it should work.
490 if(bitmap.get(bitPos % Integer.SIZE))
492 supportedHighestVersion = bitPos;
497 if(null == supportedHighestVersion)
499 throw new IllegalArgumentException("unsupported bitmap version.");
504 return supportedHighestVersion;
508 public Short getVersion() {
513 public Future<Boolean> disconnect() {
514 LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
516 Future<Boolean> result = null;
517 if (connectionAdapter.isAlive()) {
518 result = connectionAdapter.disconnect();
520 LOG.debug("connection already disconnected");
521 result = Futures.immediateFuture(true);
528 public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
529 this.auxiliaryKey = auxiliaryKey;
533 public void setSessionContext(SessionContext sessionContext) {
534 this.sessionContext = sessionContext;
538 public SwitchConnectionDistinguisher getAuxiliaryKey() {
543 public SessionContext getSessionContext() {
544 return sessionContext;
548 * @param listenerMapping the listenerMapping to set
550 public void setListenerMapping(
551 Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
552 this.listenerMapping = listenerMapping;
558 * @deprecated use {@link QueueKeeper} strategy
561 private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
562 Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
563 if (listeners != null) {
564 for (IMDMessageListener listener : listeners) {
565 // Pass cookie only for PACKT_IN
566 if ( messageType.equals("PacketInMessage.class")){
567 listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
569 listener.receive(null, this.getSessionContext(), message);
573 LOG.warn("No listeners for this message Type {}", messageType);
578 public ConnectionAdapter getConnectionAdapter() {
579 return connectionAdapter;
583 public void onConnectionReady() {
584 LOG.debug("connection is ready-to-use");
585 //TODO: fire first helloMessage
586 new Thread(new Runnable() {
589 sendFirstHelloMessage();