2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import com.google.common.collect.Lists;
53 import com.google.common.util.concurrent.Futures;
58 public class ConnectionConductorImpl implements OpenflowProtocolListener,
59 SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
61 private static final Logger LOG = LoggerFactory
62 .getLogger(ConnectionConductorImpl.class);
64 /* variable to make BitMap-based negotiation enabled / disabled.
65 * it will help while testing and isolating issues related to processing of
66 * BitMaps from switches.
68 private static final boolean isBitmapNegotiationEnable = true;
69 private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
71 private final ConnectionAdapter connectionAdapter;
72 private final List<Short> versionOrder;
73 private ConnectionConductor.CONDUCTOR_STATE conductorState;
74 private Short version;
76 private SwitchConnectionDistinguisher auxiliaryKey;
78 private SessionContext sessionContext;
80 private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
82 private boolean isFirstHelloNegotiation = true;
87 * @param connectionAdapter
89 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
90 this.connectionAdapter = connectionAdapter;
91 conductorState = CONDUCTOR_STATE.HANDSHAKING;
92 versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
93 // TODO: add a thread pool to handle ErrorQueueHandler
94 new Thread(new ErrorQueueHandler(errorQueue)).start();
99 connectionAdapter.setMessageListener(this);
100 connectionAdapter.setSystemListener(this);
101 connectionAdapter.setConnectionReadyListener(this);
106 * send first hello message to switch
108 private void sendFirstHelloMessage() {
109 short highestVersion = versionOrder.get(0);
111 HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
112 helloInputbuilder.setVersion(highestVersion);
113 helloInputbuilder.setXid(helloXid);
114 if (isBitmapNegotiationEnable) {
115 int elementsCount = highestVersion / Integer.SIZE;
116 ElementsBuilder elementsBuilder = new ElementsBuilder();
118 List<Elements> elementList = new ArrayList<Elements>();
119 int orderIndex = versionOrder.size();
120 int value = versionOrder.get(--orderIndex);
121 for (int index = 0; index <= elementsCount; index++) {
122 List<Boolean> booleanList = new ArrayList<Boolean>();
123 for (int i = 0; i < Integer.SIZE; i++) {
124 if (value == ((index * Integer.SIZE) + i)) {
125 booleanList.add(true);
126 value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
128 booleanList.add(false);
131 elementsBuilder.setType(HelloElementType.forValue(1));
132 elementsBuilder.setVersionBitmap(booleanList);
133 elementList.add(elementsBuilder.build());
135 helloInputbuilder.setElements(elementList);
136 LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
138 LOG.debug("sending first hello message: version header={} ", highestVersion);
140 connectionAdapter.hello(helloInputbuilder.build());
145 public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
146 LOG.debug("echo request received: " + echoRequestMessage.getXid());
147 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148 builder.setVersion(echoRequestMessage.getVersion());
149 builder.setXid(echoRequestMessage.getXid());
150 builder.setData(echoRequestMessage.getData());
152 connectionAdapter.echoReply(builder.build());
156 public void onErrorMessage(ErrorMessage errorMessage) {
157 // TODO Auto-generated method stub
158 LOG.debug("error received, type: " + errorMessage.getType()
159 + "; code: " + errorMessage.getCode());
163 public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
164 LOG.debug("experimenter received, type: "
165 + experimenterMessage.getExpType());
166 notifyListeners(ExperimenterMessage.class, experimenterMessage);
170 public void onFlowRemovedMessage(FlowRemovedMessage message) {
171 notifyListeners(FlowRemovedMessage.class, message);
176 * version negotiation happened as per following steps:
177 * 1. If HelloMessage version field has same version, continue connection processing.
178 * If HelloMessage version is lower than supported versions, just disconnect.
179 * 2. If HelloMessage contains bitmap and common version found in bitmap
180 * then continue connection processing. if no common version found, just disconnect.
181 * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182 * If Hello message received again with not supported version, just disconnect.
184 * TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
187 public void onHelloMessage(final HelloMessage hello) {
189 LOG.info("handshake STARTED");
190 checkState(CONDUCTOR_STATE.HANDSHAKING);
192 new Thread(new Runnable() {
196 Short remoteVersion = hello.getVersion();
197 List<Elements> elements = hello.getElements();
198 long xid = hello.getXid();
199 short proposedVersion;
200 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
202 // find the version from header version field
203 proposedVersion = proposeVersion(remoteVersion);
205 } catch (IllegalArgumentException e) {
207 connectionAdapter.disconnect();
211 // sent version is equal to remote --> version is negotiated
212 if (proposedVersion == remoteVersion) {
213 LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
214 sendHelloReply(proposedVersion, ++xid);
215 postHandshake(proposedVersion, ++xid);
217 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
219 // hello contains version bitmap, checking highest common
221 proposedVersion = proposeBitmapVersion(elements);
222 } catch (IllegalArgumentException ex) {
224 connectionAdapter.disconnect();
227 LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
228 sendHelloReply(proposedVersion, ++xid);
229 postHandshake(proposedVersion, ++xid);
231 if (isFirstHelloNegotiation) {
232 isFirstHelloNegotiation = false;
233 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
234 // send hello reply with lower version number supported
235 sendHelloReply(proposedVersion, ++xid);
237 // terminate the connection.
238 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
239 connectionAdapter.disconnect();
249 * @param proposedVersion
252 private void sendHelloReply(Short proposedVersion, Long xid)
254 HelloInputBuilder helloBuilder = new HelloInputBuilder();
255 helloBuilder.setVersion(proposedVersion).setXid(xid);
256 connectionAdapter.hello(helloBuilder.build());
261 * after handshake set features, register to session
262 * @param proposedVersion
265 private void postHandshake(Short proposedVersion, Long xid) {
267 version = proposedVersion;
268 LOG.debug("version set: " + proposedVersion);
270 GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
271 featuresBuilder.setVersion(version).setXid(xid);
272 LOG.debug("sending feature request for version={} and xid={}", version, xid);
273 Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
274 .getFeatures(featuresBuilder.build());
275 LOG.debug("waiting for features");
276 RpcResult<GetFeaturesOutput> rpcFeatures;
278 rpcFeatures = featuresFuture.get(getMaxTimeout(),
279 TimeUnit.MILLISECONDS);
280 if (!rpcFeatures.isSuccessful()) {
281 LOG.error("obtained features problem: {}"
282 , rpcFeatures.getErrors());
284 GetFeaturesOutput featureOutput = rpcFeatures.getResult();
285 LOG.debug("obtained features: datapathId={}",
286 featureOutput.getDatapathId());
287 LOG.debug("obtained features: auxiliaryId={}",
288 featureOutput.getAuxiliaryId());
289 conductorState = CONDUCTOR_STATE.WORKING;
291 OFSessionUtil.registerSession(this,
292 featureOutput, version);
293 this.setListenerMapping(OFSessionUtil.getListenersMap());
294 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
296 } catch (Exception e) {
298 LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
305 * @return rpc-response timeout in [ms]
307 private long getMaxTimeout() {
308 // TODO:: get from configuration
315 private void handleException(Exception e) {
316 Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
318 errorQueue.put(causeAndThread);
319 } catch (InterruptedException e1) {
320 LOG.error(e1.getMessage(), e1);
325 public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
326 // TODO Auto-generated method stub
330 public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
331 // TODO Auto-generated method stub
335 public void onPacketInMessage(PacketInMessage message) {
336 notifyListeners(PacketInMessage.class, message);
340 public void onPortStatusMessage(PortStatusMessage message) {
341 this.getSessionContext().processPortStatusMsg(message);
342 notifyListeners(PortStatusMessage.class, message);
346 public void onSwitchIdleEvent(SwitchIdleEvent notification) {
347 if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
348 // idle state in any other conductorState than WORKING means real
349 // problem and wont be handled by echoReply, but disconnection
351 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
353 LOG.debug("first idle state occured");
354 EchoInputBuilder builder = new EchoInputBuilder();
355 builder.setVersion(version);
356 // TODO: get xid from sessionContext
359 Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
360 .echo(builder.build());
363 // TODO: read timeout from config
364 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
366 if (echoReplyValue.isSuccessful()) {
367 conductorState = CONDUCTOR_STATE.WORKING;
369 for (RpcError replyError : echoReplyValue.getErrors()) {
370 Throwable cause = replyError.getCause();
372 "while receiving echoReply in TIMEOUTING state: "
373 + cause.getMessage(), cause);
375 //switch issue occurred
376 throw new Exception("switch issue occurred");
378 } catch (Exception e) {
379 LOG.error("while waiting for echoReply in TIMEOUTING state: "
380 + e.getMessage(), e);
381 //switch is not responding
383 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
389 * @param conductorState
390 * the connectionState to set
393 public void setConductorState(CONDUCTOR_STATE conductorState) {
394 this.conductorState = conductorState;
398 public CONDUCTOR_STATE getConductorState() {
399 return conductorState;
405 private void checkState(CONDUCTOR_STATE expectedState) {
406 if (!conductorState.equals(expectedState)) {
407 throw new IllegalStateException("Expected state: " + expectedState
408 + ", actual state:" + conductorState);
413 public void onDisconnectEvent(DisconnectEvent arg0) {
414 SessionManager sessionManager = OFSessionUtil.getSessionManager();
415 sessionManager.invalidateOnDisconnect(this);
419 * find supported version based on remoteVersion
420 * @param remoteVersion
423 protected short proposeVersion(short remoteVersion) {
424 Short proposal = null;
425 for (short offer : versionOrder) {
426 if (offer <= remoteVersion) {
431 if (proposal == null) {
432 throw new IllegalArgumentException("unsupported version: "
439 * find common highest supported bitmap version
443 protected short proposeBitmapVersion(List<Elements> list)
445 Short supportedHighestVersion = null;
446 if((null != list) && (0 != list.size()))
448 for(Elements element : list)
450 List<Boolean> bitmap = element.getVersionBitmap();
451 // check for version bitmap
452 for(short bitPos : versionOrder)
454 // with all the version it should work.
455 if(bitmap.get(bitPos % Integer.SIZE))
457 supportedHighestVersion = bitPos;
462 if(null == supportedHighestVersion)
464 throw new IllegalArgumentException("unsupported bitmap version.");
469 return supportedHighestVersion;
473 public Short getVersion() {
478 public Future<Boolean> disconnect() {
479 LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
481 Future<Boolean> result = null;
482 if (connectionAdapter.isAlive()) {
483 result = connectionAdapter.disconnect();
485 LOG.debug("connection already disconnected");
486 result = Futures.immediateFuture(true);
493 public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
494 this.auxiliaryKey = auxiliaryKey;
498 public void setSessionContext(SessionContext sessionContext) {
499 this.sessionContext = sessionContext;
503 public SwitchConnectionDistinguisher getAuxiliaryKey() {
508 public SessionContext getSessionContext() {
509 return sessionContext;
513 * @param listenerMapping the listenerMapping to set
515 public void setListenerMapping(
516 Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
517 //TODO: adjust the listener interface
518 this.listenerMapping = listenerMapping;
525 private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
526 Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
527 if (listeners != null) {
528 for (IMDMessageListener listener : listeners) {
529 // Pass cookie only for PACKT_IN
530 if ( messageType.equals("PacketInMessage.class")){
531 listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
533 listener.receive(null, this.getSessionContext(), message);
537 LOG.warn("No listeners for this message Type {}", messageType);
542 public ConnectionAdapter getConnectionAdapter() {
543 return connectionAdapter;
547 public void onConnectionReady() {
548 // TODO Auto-generated method stub