2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
27 import org.opendaylight.controller.sal.core.ContainerFlow;
28 import org.opendaylight.controller.sal.core.IContainerAware;
29 import org.opendaylight.controller.sal.core.IContainerListener;
30 import org.opendaylight.controller.sal.core.Node;
31 import org.opendaylight.controller.sal.core.Node.NodeIDType;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.core.Property;
34 import org.opendaylight.controller.sal.core.UpdateType;
35 import org.opendaylight.controller.sal.flowprogrammer.Flow;
36 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
37 import org.opendaylight.controller.sal.match.Match;
38 import org.opendaylight.controller.sal.match.MatchType;
39 import org.opendaylight.controller.sal.utils.GlobalConstants;
40 import org.opendaylight.controller.sal.utils.HexEncode;
41 import org.opendaylight.controller.sal.utils.NodeCreator;
42 import org.opendaylight.controller.sal.utils.Status;
43 import org.opendaylight.controller.sal.utils.StatusCode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFlowMod;
46 import org.openflow.protocol.OFFlowRemoved;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPort;
49 import org.openflow.protocol.OFType;
50 import org.openflow.protocol.action.OFAction;
51 import org.osgi.framework.BundleContext;
52 import org.osgi.framework.FrameworkUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Represents the openflow plugin component in charge of programming the flows
58 * the flow programming and relay them to functional modules above SAL.
60 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
61 IMessageListener, IContainerListener, IInventoryShimExternalListener,
62 CommandProvider, IContainerAware {
63 private static final Logger log = LoggerFactory
64 .getLogger(FlowProgrammerService.class);
65 private IController controller;
66 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
67 private Map<String, Set<NodeConnector>> containerToNc;
68 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
69 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
70 private IPluginOutConnectionService connectionOutService;
72 public FlowProgrammerService() {
74 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
75 containerToNc = new HashMap<String, Set<NodeConnector>>();
76 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
79 public void setController(IController core) {
80 this.controller = core;
83 public void unsetController(IController core) {
84 if (this.controller == core) {
85 this.controller = null;
89 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
90 connectionOutService = s;
93 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
94 if (connectionOutService == s) {
95 connectionOutService = null;
99 public void setFlowProgrammerNotifier(Map<String, ?> props,
100 IFlowProgrammerNotifier s) {
101 if (props == null || props.get("containerName") == null) {
102 log.error("Didn't receive the service correct properties");
105 String containerName = (String) props.get("containerName");
106 this.flowProgrammerNotifiers.put(containerName, s);
109 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
110 IFlowProgrammerNotifier s) {
111 if (props == null || props.get("containerName") == null) {
112 log.error("Didn't receive the service correct properties");
115 String containerName = (String) props.get("containerName");
116 if (this.flowProgrammerNotifiers != null
117 && this.flowProgrammerNotifiers.containsKey(containerName)
118 && this.flowProgrammerNotifiers.get(containerName) == s) {
119 this.flowProgrammerNotifiers.remove(containerName);
124 * Function called by the dependency manager when all the required
125 * dependencies are satisfied
129 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
130 this.controller.addMessageListener(OFType.ERROR, this);
131 registerWithOSGIConsole();
135 * Function called by the dependency manager when at least one dependency
136 * become unsatisfied or when the component is shutting down because for
137 * example bundle is being stopped.
144 * Function called by dependency manager after "init ()" is called and after
145 * the services provided by the class are registered in the service registry
152 * Function called by the dependency manager before the services exported by
153 * the component are unregistered, this will be followed by a "destroy ()"
161 public Status addFlow(Node node, Flow flow) {
162 if (!connectionOutService.isLocal(node)) {
163 log.debug("Add flow will not be processed in a non-master controller for node " + node);
164 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
167 return addFlowInternal(node, flow, 0);
171 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
172 if (!connectionOutService.isLocal(node)) {
173 log.debug("Modify flow will not be processed in a non-master controller for node " + node);
174 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
177 return modifyFlowInternal(node, oldFlow, newFlow, 0);
181 public Status removeFlow(Node node, Flow flow) {
182 if (!connectionOutService.isLocal(node)) {
183 log.debug("Remove flow will not be processed in a non-master controller for node " + node);
184 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
187 return removeFlowInternal(node, flow, 0);
191 public Status addFlowAsync(Node node, Flow flow, long rid) {
192 if (!connectionOutService.isLocal(node)) {
193 log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
194 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
197 return addFlowInternal(node, flow, rid);
201 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
203 if (!connectionOutService.isLocal(node)) {
204 log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
205 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
208 return modifyFlowInternal(node, oldFlow, newFlow, rid);
212 public Status removeFlowAsync(Node node, Flow flow, long rid) {
213 if (!connectionOutService.isLocal(node)) {
214 log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
215 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
218 return removeFlowInternal(node, flow, rid);
221 private Status addFlowInternal(Node node, Flow flow, long rid) {
222 String action = "add";
223 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
224 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
225 action, "Invalid node type"));
228 if (controller != null) {
229 ISwitch sw = controller.getSwitch((Long) node.getID());
231 FlowConverter x = new FlowConverter(flow);
232 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
237 * Synchronous message send. Each message is followed by a
240 result = sw.syncSend(msg);
243 * Message will be sent asynchronously. A Barrier message
244 * will be inserted automatically to synchronize the
247 result = asyncMsgSend(node, sw, msg, rid);
249 return getStatusInternal(result, action, rid);
251 return new Status(StatusCode.GONE, errorString("send", action,
252 "Switch is not available"));
255 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
256 "Internal plugin error"));
259 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
260 String action = "modify";
261 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
262 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
263 action, "Invalid node type"));
265 if (controller != null) {
266 ISwitch sw = controller.getSwitch((Long) node.getID());
268 OFMessage msg1 = null, msg2 = null;
270 // If priority and match portion are the same, send a
271 // modification message
272 if (oldFlow.getPriority() != newFlow.getPriority()
273 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
274 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
275 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
276 msg2 = new FlowConverter(newFlow).getOFFlowMod(
277 OFFlowMod.OFPFC_ADD, null);
279 msg1 = new FlowConverter(newFlow).getOFFlowMod(
280 OFFlowMod.OFPFC_MODIFY_STRICT, null);
283 * Synchronous message send
285 action = (msg2 == null) ? "modify" : "delete";
289 * Synchronous message send. Each message is followed by a
292 result = sw.syncSend(msg1);
295 * Message will be sent asynchronously. A Barrier message
296 * will be inserted automatically to synchronize the
299 result = asyncMsgSend(node, sw, msg1, rid);
302 Status rv = getStatusInternal(result, action, rid);
303 if ((msg2 == null) || !rv.isSuccess()) {
310 * Synchronous message send. Each message is followed by a
313 result = sw.syncSend(msg2);
316 * Message will be sent asynchronously. A Barrier message
317 * will be inserted automatically to synchronize the
320 result = asyncMsgSend(node, sw, msg2, rid);
322 return getStatusInternal(result, action, rid);
324 return new Status(StatusCode.GONE, errorString("send", action,
325 "Switch is not available"));
328 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
329 "Internal plugin error"));
332 private Status removeFlowInternal(Node node, Flow flow, long rid) {
333 String action = "remove";
334 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
335 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
336 action, "Invalid node type"));
338 if (controller != null) {
339 ISwitch sw = controller.getSwitch((Long) node.getID());
341 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
342 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
346 * Synchronous message send. Each message is followed by a
349 result = sw.syncSend(msg);
352 * Message will be sent asynchronously. A Barrier message
353 * will be inserted automatically to synchronize the
356 result = asyncMsgSend(node, sw, msg, rid);
358 return getStatusInternal(result, action, rid);
360 return new Status(StatusCode.GONE, errorString("send", action,
361 "Switch is not available"));
364 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
365 "Internal plugin error"));
369 public Status removeAllFlows(Node node) {
370 if (!connectionOutService.isLocal(node)) {
371 log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
372 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
375 return new Status(StatusCode.SUCCESS);
378 private String errorString(String phase, String action, String cause) {
380 + ((phase != null) ? phase + " the " + action
381 + " flow message: " : action + " the flow: ") + cause;
385 public void receive(ISwitch sw, OFMessage msg) {
386 if (msg instanceof OFFlowRemoved) {
387 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
388 } else if (msg instanceof OFError) {
389 handleErrorMessage(sw, (OFError) msg);
393 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
394 Node node = NodeCreator.createOFNode(sw.getId());
395 Flow flow = new FlowConverter(msg.getMatch(),
396 new ArrayList<OFAction>(0)).getFlow(node);
397 flow.setPriority(msg.getPriority());
398 flow.setIdleTimeout(msg.getIdleTimeout());
399 flow.setId(msg.getCookie());
401 Match match = flow.getMatch();
402 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
403 .getField(MatchType.IN_PORT).getValue() : null;
405 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
407 String container = containerNotifier.getKey();
408 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
410 * Switch only provide us with the match information. For now let's
411 * try to identify the container membership only from the input port
412 * match field. In any case, upper layer consumers can derive
413 * whether the notification was not for them. More sophisticated
414 * filtering can be added later on.
417 || container.equals(GlobalConstants.DEFAULT.toString())
418 || (containerToNc.containsKey(container) && containerToNc.get(container).contains(inPort))) {
419 notifier.flowRemoved(node, flow);
424 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
425 Node node = NodeCreator.createOFNode(sw.getId());
426 OFMessage offendingMsg = errorMsg.getOffendingMsg();
428 if (offendingMsg != null) {
429 xid = offendingMsg.getXid();
431 xid = errorMsg.getXid();
434 Long rid = getMessageRid(sw.getId(), xid);
436 * Null or zero requestId indicates that the error message is meant for
437 * a sync message. It will be handled by the sync message worker thread.
438 * Hence we are done here.
440 if ((rid == null) || (rid == 0)) {
445 * Notifies the caller that error has been reported for a previous flow
446 * programming request
448 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
450 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
451 notifier.flowErrorReported(node, rid, Utils.getOFErrorString(errorMsg));
456 public void tagUpdated(String containerName, Node n, short oldTag,
457 short newTag, UpdateType t) {
462 public void containerFlowUpdated(String containerName,
463 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
467 public void nodeConnectorUpdated(String containerName, NodeConnector p,
471 if (!containerToNc.containsKey(containerName)) {
472 containerToNc.put(containerName, new HashSet<NodeConnector>());
474 containerToNc.get(containerName).add(p);
479 Set<NodeConnector> target = containerToNc.get(containerName);
480 if (target != null) {
489 public void containerModeUpdated(UpdateType t) {
494 public Status syncSendBarrierMessage(Node node) {
495 if (!connectionOutService.isLocal(node)) {
496 log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
497 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
500 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
501 return new Status(StatusCode.NOTACCEPTABLE,
502 "The node does not support Barrier message.");
505 if (controller != null) {
506 long swid = (Long) node.getID();
507 ISwitch sw = controller.getSwitch(swid);
509 sw.syncSendBarrierMessage();
511 return (new Status(StatusCode.SUCCESS));
513 return new Status(StatusCode.GONE,
514 "The node does not have a valid Switch reference.");
517 return new Status(StatusCode.INTERNALERROR,
518 "Failed to send Barrier message.");
522 public Status asyncSendBarrierMessage(Node node) {
523 if (!connectionOutService.isLocal(node)) {
524 log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
525 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
528 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
529 return new Status(StatusCode.NOTACCEPTABLE,
530 "The node does not support Barrier message.");
533 if (controller != null) {
534 long swid = (Long) node.getID();
535 ISwitch sw = controller.getSwitch(swid);
537 sw.asyncSendBarrierMessage();
539 return (new Status(StatusCode.SUCCESS));
541 return new Status(StatusCode.GONE,
542 "The node does not have a valid Switch reference.");
545 return new Status(StatusCode.INTERNALERROR,
546 "Failed to send Barrier message.");
550 * This method sends the message asynchronously until the number of messages
551 * sent reaches a threshold. Then a Barrier message is sent automatically
552 * for sync purpose. An unique Request ID associated with the message is
553 * passed down by the caller. The Request ID will be returned to the caller
554 * when an error message is received from the switch.
561 * The OF message to be sent
566 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
567 Object result = Boolean.TRUE;
568 long swid = (Long) node.getID();
571 xid = sw.asyncSend(msg);
572 addXid2Rid(swid, xid, rid);
574 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
575 if (swxid2rid == null) {
579 int size = swxid2rid.size();
580 if (size % barrierMessagePriorCount == 0) {
581 result = asyncSendBarrierMessage(node);
588 * A number of async messages are sent followed by a synchronous Barrier
589 * message. This method returns the maximum async messages that can be sent
590 * before the Barrier message.
592 * @return The max count of async messages sent prior to Barrier message
594 private int getBarrierMessagePriorCount() {
595 String count = System.getProperty("of.barrierMessagePriorCount");
600 rv = Integer.parseInt(count);
601 } catch (Exception e) {
609 * This method returns the message Request ID previously assigned by the
610 * caller for a given OF message xid
616 * @return The Request ID
618 private Long getMessageRid(long swid, Integer xid) {
625 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
626 if (swxid2rid != null) {
627 rid = swxid2rid.get(xid);
633 * This method returns a copy of outstanding xid to rid mappings.for a given
638 * @return a copy of xid2rid mappings
640 public Map<Integer, Long> getSwXid2Rid(long swid) {
641 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
643 if (swxid2rid != null) {
644 return new HashMap<Integer, Long>(swxid2rid);
646 return new HashMap<Integer, Long>();
651 * Adds xid to rid mapping to the local DB
658 * The message Request ID
660 private void addXid2Rid(long swid, int xid, long rid) {
661 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
662 if (swxid2rid != null) {
663 swxid2rid.put(xid, rid);
668 * When an Error message is received, this method will be invoked to remove
669 * the offending xid from the local DB.
676 private void removeXid2Rid(long swid, int xid) {
677 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
678 if (swxid2rid != null) {
679 swxid2rid.remove(xid);
684 * Convert various result into Status
687 * The returned result from previous action
689 * add/modify/delete flow action
691 * The Request ID associated with the flow message
694 private Status getStatusInternal(Object result, String action, long rid) {
695 if (result instanceof Boolean) {
696 return ((Boolean) result == Boolean.TRUE) ? new Status(
697 StatusCode.SUCCESS, rid) : new Status(
698 StatusCode.TIMEOUT, errorString(null, action,
699 "Request Timed Out"));
700 } else if (result instanceof Status) {
701 return (Status) result;
702 } else if (result instanceof OFError) {
703 OFError res = (OFError) result;
704 return new Status(StatusCode.INTERNALERROR, errorString(
705 "program", action, Utils.getOFErrorString(res)));
707 return new Status(StatusCode.INTERNALERROR, errorString(
708 "send", action, "Internal Error"));
713 * When a Barrier reply is received, this method will be invoked to clear
719 private void clearXid2Rid(long swid) {
720 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
721 if (swxid2rid != null) {
727 public void updateNode(Node node, UpdateType type, Set<Property> props) {
728 long swid = (Long)node.getID();
732 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
733 this.xid2rid.put(swid, swxid2rid);
738 this.xid2rid.remove(swid);
745 public void updateNodeConnector(NodeConnector nodeConnector,
746 UpdateType type, Set<Property> props) {
749 private void registerWithOSGIConsole() {
750 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
752 bundleContext.registerService(CommandProvider.class.getName(), this,
757 public String getHelp() {
758 StringBuffer help = new StringBuffer();
759 help.append("-- Flow Programmer Service --\n");
760 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
761 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
762 return help.toString();
765 public void _px2r(CommandInterpreter ci) {
766 String st = ci.nextArgument();
768 ci.println("Please enter a valid node id");
774 sid = HexEncode.stringToLong(st);
775 } catch (NumberFormatException e) {
776 ci.println("Please enter a valid node id");
780 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
781 if (swxid2rid == null) {
782 ci.println("The node id entered does not exist");
786 ci.println("xid rid");
788 Set<Integer> xidSet = swxid2rid.keySet();
789 if (xidSet == null) {
793 for (Integer xid : xidSet) {
794 ci.println(xid + " " + swxid2rid.get(xid));
798 public void _px2rc(CommandInterpreter ci) {
799 ci.println("Max num of async messages sent prior to the Barrier message is "
800 + barrierMessagePriorCount);
804 public void containerCreate(String containerName) {
809 public void containerDestroy(String containerName) {
810 containerToNc.remove(containerName);