2 * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.api.OFConstants;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
27 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
28 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
29 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
30 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
31 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
32 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl;
33 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
35 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.common.RpcError;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 import com.google.common.util.concurrent.Futures;
73 public class ConnectionConductorImpl implements OpenflowProtocolListener,
74 SystemNotificationsListener, ConnectionConductor,
75 ConnectionReadyListener, HandshakeListener, NotificationEnqueuer,
81 private static final int INGRESS_QUEUE_MAX_SIZE = 200;
83 protected static final Logger LOG = LoggerFactory
84 .getLogger(ConnectionConductorImpl.class);
87 * variable to make BitMap-based negotiation enabled / disabled. it will
88 * help while testing and isolating issues related to processing of BitMaps
91 private boolean isBitmapNegotiationEnable = true;
92 protected ErrorHandler errorHandler;
94 private final ConnectionAdapter connectionAdapter;
95 private ConnectionConductor.CONDUCTOR_STATE conductorState;
96 private Short version;
98 protected SwitchConnectionDistinguisher auxiliaryKey;
100 protected SessionContext sessionContext;
102 private QueueProcessor<OfHeader, DataObject> queueProcessor;
103 private QueueKeeper<OfHeader> queue;
104 private ThreadPoolExecutor hsPool;
105 private HandshakeManager handshakeManager;
107 private boolean firstHelloProcessed;
109 private PortFeaturesUtil portFeaturesUtils;
111 private int conductorId;
113 private int ingressMaxQueueSize;
116 * @param connectionAdapter
118 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
119 this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
123 * @param connectionAdapter
124 * @param ingressMaxQueueSize
125 * ingress queue limit (blocking)
127 public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
128 int ingressMaxQueueSize) {
129 this.connectionAdapter = connectionAdapter;
130 this.ingressMaxQueueSize = ingressMaxQueueSize;
131 conductorState = CONDUCTOR_STATE.HANDSHAKING;
132 firstHelloProcessed = false;
133 handshakeManager = new HandshakeManagerImpl(connectionAdapter,
134 ConnectionConductor.versionOrder.get(0),
135 ConnectionConductor.versionOrder);
136 handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
137 handshakeManager.setHandshakeListener(this);
138 portFeaturesUtils = PortFeaturesUtil.getInstance();
143 int handshakeThreadLimit = 1;
144 hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
145 handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
146 new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
149 connectionAdapter.setMessageListener(this);
150 connectionAdapter.setSystemListener(this);
151 connectionAdapter.setConnectionReadyListener(this);
152 WaterMarkListener waterMarkListener = new WaterMarkListenerImpl(
154 queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor,
155 ingressMaxQueueSize, waterMarkListener);
159 public void setQueueProcessor(
160 QueueProcessor<OfHeader, DataObject> queueProcessor) {
161 this.queueProcessor = queueProcessor;
165 * @param errorHandler
166 * the errorHandler to set
169 public void setErrorHandler(ErrorHandler errorHandler) {
170 this.errorHandler = errorHandler;
171 handshakeManager.setErrorHandler(errorHandler);
175 public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
176 new Thread(new Runnable() {
179 LOG.debug("echo request received: "
180 + echoRequestMessage.getXid());
181 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
182 builder.setVersion(echoRequestMessage.getVersion());
183 builder.setXid(echoRequestMessage.getXid());
184 builder.setData(echoRequestMessage.getData());
186 getConnectionAdapter().echoReply(builder.build());
192 public void onErrorMessage(ErrorMessage errorMessage) {
193 enqueueMessage(errorMessage);
199 private void enqueueMessage(OfHeader message) {
200 enqueueMessage(message, QueueType.DEFAULT);
204 public void enqueueNotification(NotificationQueueWrapper notification) {
205 enqueueMessage(notification);
213 private void enqueueMessage(OfHeader message, QueueType queueType) {
214 queue.push(message, this, queueType);
218 public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
219 enqueueMessage(experimenterMessage);
223 public void onFlowRemovedMessage(FlowRemovedMessage message) {
224 enqueueMessage(message);
228 * version negotiation happened as per following steps: 1. If HelloMessage
229 * version field has same version, continue connection processing. If
230 * HelloMessage version is lower than supported versions, just disconnect.
231 * 2. If HelloMessage contains bitmap and common version found in bitmap
232 * then continue connection processing. if no common version found, just
233 * disconnect. 3. If HelloMessage version is not supported, send
234 * HelloMessage with lower supported version. 4. If Hello message received
235 * again with not supported version, just disconnect.
238 public void onHelloMessage(final HelloMessage hello) {
239 LOG.debug("processing HELLO.xid: {}", hello.getXid());
240 firstHelloProcessed = true;
241 checkState(CONDUCTOR_STATE.HANDSHAKING);
242 HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
243 hello, handshakeManager, connectionAdapter);
244 hsPool.submit(handshakeStepWrapper);
248 * @return rpc-response timeout in [ms]
250 protected long getMaxTimeout() {
251 // TODO:: get from configuration
256 * @return milliseconds
258 protected TimeUnit getMaxTimeoutUnit() {
259 // TODO:: get from configuration
260 return TimeUnit.MILLISECONDS;
264 public void onMultipartReplyMessage(MultipartReplyMessage message) {
265 enqueueMessage(message);
269 public void onPacketInMessage(PacketInMessage message) {
270 enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
274 public void onPortStatusMessage(PortStatusMessage message) {
275 processPortStatusMsg(message);
276 enqueueMessage(message);
279 protected void processPortStatusMsg(PortStatus msg) {
280 if (msg.getReason().getIntValue() == 2) {
282 } else if (msg.getReason().getIntValue() == 0) {
284 } else if (msg.getReason().getIntValue() == 1) {
289 protected void updatePort(PortStatus msg) {
290 Long portNumber = msg.getPortNo();
291 Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
293 if (portBandwidth == null) {
295 "can't get bandwidth info from port: {}, aborting port update",
298 this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
299 this.getSessionContext().getPortsBandwidth()
300 .put(portNumber, portBandwidth);
304 protected void deletePort(PortGrouping port) {
305 Long portNumber = port.getPortNo();
307 this.getSessionContext().getPhysicalPorts().remove(portNumber);
308 this.getSessionContext().getPortsBandwidth().remove(portNumber);
312 public void onSwitchIdleEvent(SwitchIdleEvent notification) {
313 new Thread(new Runnable() {
316 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
317 // idle state in any other conductorState than WORKING means
319 // problem and wont be handled by echoReply, but
322 OFSessionUtil.getSessionManager().invalidateOnDisconnect(
323 ConnectionConductorImpl.this);
326 "first idle state occured, sessionCtx={}|auxId={}",
327 sessionContext, auxiliaryKey);
328 EchoInputBuilder builder = new EchoInputBuilder();
329 builder.setVersion(getVersion());
330 builder.setXid(getSessionContext().getNextXid());
332 Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
333 .echo(builder.build());
336 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture
337 .get(getMaxTimeout(), getMaxTimeoutUnit());
338 if (echoReplyValue.isSuccessful()) {
339 setConductorState(CONDUCTOR_STATE.WORKING);
341 for (RpcError replyError : echoReplyValue
343 Throwable cause = replyError.getCause();
345 "while receiving echoReply in TIMEOUTING state: "
346 + cause.getMessage(), cause);
348 // switch issue occurred
349 throw new Exception("switch issue occurred");
351 } catch (Exception e) {
352 LOG.error("while waiting for echoReply in TIMEOUTING state: "
354 errorHandler.handleException(e, sessionContext);
355 // switch is not responding
357 OFSessionUtil.getSessionManager()
358 .invalidateOnDisconnect(
359 ConnectionConductorImpl.this);
368 * @param conductorState
369 * the connectionState to set
372 public void setConductorState(CONDUCTOR_STATE conductorState) {
373 this.conductorState = conductorState;
377 public CONDUCTOR_STATE getConductorState() {
378 return conductorState;
382 * @param expectedState
384 protected void checkState(CONDUCTOR_STATE expectedState) {
385 if (!conductorState.equals(expectedState)) {
386 throw new IllegalStateException("Expected state: " + expectedState
387 + ", actual state:" + conductorState);
392 public void onDisconnectEvent(DisconnectEvent arg0) {
393 SessionManager sessionManager = OFSessionUtil.getSessionManager();
394 sessionManager.invalidateOnDisconnect(this);
399 public Short getVersion() {
404 public Future<Boolean> disconnect() {
405 LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext,
408 Future<Boolean> result = null;
409 if (connectionAdapter.isAlive()) {
410 result = connectionAdapter.disconnect();
412 LOG.debug("connection already disconnected");
413 result = Futures.immediateFuture(true);
420 public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
421 this.auxiliaryKey = auxiliaryKey;
425 public void setSessionContext(SessionContext sessionContext) {
426 this.sessionContext = sessionContext;
430 public SwitchConnectionDistinguisher getAuxiliaryKey() {
435 public SessionContext getSessionContext() {
436 return sessionContext;
440 public ConnectionAdapter getConnectionAdapter() {
441 return connectionAdapter;
445 public void onConnectionReady() {
446 LOG.debug("connection is ready-to-use");
447 if (!firstHelloProcessed) {
448 HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
449 null, handshakeManager, connectionAdapter);
450 hsPool.execute(handshakeStepWrapper);
451 firstHelloProcessed = true;
453 LOG.debug("already touched by hello message");
458 public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
459 Short negotiatedVersion) {
460 postHandshakeBasic(featureOutput, negotiatedVersion);
462 // post-handshake actions
463 if (version == OFConstants.OFP_VERSION_1_3) {
465 requestGroupFeatures();
466 requestMeterFeatures();
473 public void onHandshakeFailure() {
474 LOG.info("OF handshake failed, doing cleanup.");
481 * @param featureOutput
482 * @param negotiatedVersion
484 protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
485 Short negotiatedVersion) {
486 version = negotiatedVersion;
487 if (version == OFConstants.OFP_VERSION_1_0) {
488 // Because the GetFeaturesOutput contains information about the port
489 // in OF1.0 (that we would otherwise get from the PortDesc) we have
491 // it up for parsing to convert into a NodeConnectorUpdate
493 // BUG-1988 - this must be the first item in queue in order not to
494 // get behind link-up message
495 enqueueMessage(featureOutput);
498 OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
501 conductorState = CONDUCTOR_STATE.WORKING;
502 QueueKeeperFactory.plugQueue(queueProcessor, queue);
506 * Send an OFPMP_DESC request message to the switch
508 private void requestDesc() {
509 MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
510 builder.setType(MultipartType.OFPMPDESC);
511 builder.setVersion(getVersion());
512 builder.setFlags(new MultipartRequestFlags(false));
513 builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
515 builder.setXid(getSessionContext().getNextXid());
516 getConnectionAdapter().multipartRequest(builder.build());
519 private void requestPorts() {
520 MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
521 builder.setType(MultipartType.OFPMPPORTDESC);
522 builder.setVersion(getVersion());
523 builder.setFlags(new MultipartRequestFlags(false));
524 builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
526 builder.setXid(getSessionContext().getNextXid());
527 getConnectionAdapter().multipartRequest(builder.build());
530 private void requestGroupFeatures() {
531 MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
532 mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
533 mprInput.setVersion(getVersion());
534 mprInput.setFlags(new MultipartRequestFlags(false));
535 mprInput.setXid(getSessionContext().getNextXid());
537 MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
538 mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
540 LOG.debug("Send group features statistics request :{}",
541 mprGroupFeaturesBuild);
542 getConnectionAdapter().multipartRequest(mprInput.build());
546 private void requestMeterFeatures() {
547 MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
548 mprInput.setType(MultipartType.OFPMPMETERFEATURES);
549 mprInput.setVersion(getVersion());
550 mprInput.setFlags(new MultipartRequestFlags(false));
551 mprInput.setXid(getSessionContext().getNextXid());
553 MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
554 mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
556 LOG.debug("Send meter features statistics request :{}",
557 mprMeterFeaturesBuild);
558 getConnectionAdapter().multipartRequest(mprInput.build());
563 * @param isBitmapNegotiationEnable
564 * the isBitmapNegotiationEnable to set
566 public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
567 this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
570 protected void shutdownPool() {
571 hsPool.shutdownNow();
572 LOG.debug("pool is terminated: {}", hsPool.isTerminated());
575 protected void shutdownPoolPolitely() {
578 hsPool.awaitTermination(1, TimeUnit.SECONDS);
579 } catch (InterruptedException e) {
580 LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
584 LOG.debug("pool is terminated: {}", hsPool.isTerminated());
588 public void setId(int conductorId) {
589 this.conductorId = conductorId;
593 public void close() {
594 shutdownPoolPolitely();
595 conductorState = CONDUCTOR_STATE.RIP;