2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
27 import org.opendaylight.controller.sal.core.ContainerFlow;
28 import org.opendaylight.controller.sal.core.IContainerListener;
29 import org.opendaylight.controller.sal.core.Node;
30 import org.opendaylight.controller.sal.core.Node.NodeIDType;
31 import org.opendaylight.controller.sal.core.NodeConnector;
32 import org.opendaylight.controller.sal.core.Property;
33 import org.opendaylight.controller.sal.core.UpdateType;
34 import org.opendaylight.controller.sal.flowprogrammer.Flow;
35 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
36 import org.opendaylight.controller.sal.match.Match;
37 import org.opendaylight.controller.sal.match.MatchType;
38 import org.opendaylight.controller.sal.utils.GlobalConstants;
39 import org.opendaylight.controller.sal.utils.HexEncode;
40 import org.opendaylight.controller.sal.utils.NodeCreator;
41 import org.opendaylight.controller.sal.utils.Status;
42 import org.opendaylight.controller.sal.utils.StatusCode;
43 import org.openflow.protocol.OFError;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFFlowRemoved;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFType;
49 import org.openflow.protocol.action.OFAction;
50 import org.osgi.framework.BundleContext;
51 import org.osgi.framework.FrameworkUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Represents the openflow plugin component in charge of programming the flows
57 * the flow programming and relay them to functional modules above SAL.
59 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
60 IMessageListener, IContainerListener, IInventoryShimExternalListener,
62 private static final Logger log = LoggerFactory
63 .getLogger(FlowProgrammerService.class);
64 private IController controller;
65 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
66 private Map<String, Set<NodeConnector>> containerToNc;
67 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
68 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
69 private IPluginOutConnectionService connectionOutService;
71 public FlowProgrammerService() {
73 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
74 containerToNc = new HashMap<String, Set<NodeConnector>>();
75 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
78 public void setController(IController core) {
79 this.controller = core;
82 public void unsetController(IController core) {
83 if (this.controller == core) {
84 this.controller = null;
88 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
89 connectionOutService = s;
92 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
93 if (connectionOutService == s) {
94 connectionOutService = null;
98 public void setFlowProgrammerNotifier(Map<String, ?> props,
99 IFlowProgrammerNotifier s) {
100 if (props == null || props.get("containerName") == null) {
101 log.error("Didn't receive the service correct properties");
104 String containerName = (String) props.get("containerName");
105 this.flowProgrammerNotifiers.put(containerName, s);
108 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
109 IFlowProgrammerNotifier s) {
110 if (props == null || props.get("containerName") == null) {
111 log.error("Didn't receive the service correct properties");
114 String containerName = (String) props.get("containerName");
115 if (this.flowProgrammerNotifiers != null
116 && this.flowProgrammerNotifiers.containsKey(containerName)
117 && this.flowProgrammerNotifiers.get(containerName) == s) {
118 this.flowProgrammerNotifiers.remove(containerName);
123 * Function called by the dependency manager when all the required
124 * dependencies are satisfied
128 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
129 this.controller.addMessageListener(OFType.ERROR, this);
130 registerWithOSGIConsole();
134 * Function called by the dependency manager when at least one dependency
135 * become unsatisfied or when the component is shutting down because for
136 * example bundle is being stopped.
143 * Function called by dependency manager after "init ()" is called and after
144 * the services provided by the class are registered in the service registry
151 * Function called by the dependency manager before the services exported by
152 * the component are unregistered, this will be followed by a "destroy ()"
160 public Status addFlow(Node node, Flow flow) {
161 if (!connectionOutService.isLocal(node)) {
162 log.debug("Add flow will not be processed in a non-master controller for node " + node);
163 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
166 return addFlowInternal(node, flow, 0);
170 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
171 if (!connectionOutService.isLocal(node)) {
172 log.debug("Modify flow will not be processed in a non-master controller for node " + node);
173 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
176 return modifyFlowInternal(node, oldFlow, newFlow, 0);
180 public Status removeFlow(Node node, Flow flow) {
181 if (!connectionOutService.isLocal(node)) {
182 log.debug("Remove flow will not be processed in a non-master controller for node " + node);
183 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
186 return removeFlowInternal(node, flow, 0);
190 public Status addFlowAsync(Node node, Flow flow, long rid) {
191 if (!connectionOutService.isLocal(node)) {
192 log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
193 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
196 return addFlowInternal(node, flow, rid);
200 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
202 if (!connectionOutService.isLocal(node)) {
203 log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
204 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
207 return modifyFlowInternal(node, oldFlow, newFlow, rid);
211 public Status removeFlowAsync(Node node, Flow flow, long rid) {
212 if (!connectionOutService.isLocal(node)) {
213 log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
214 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
217 return removeFlowInternal(node, flow, rid);
220 private Status addFlowInternal(Node node, Flow flow, long rid) {
221 String action = "add";
222 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
223 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
224 action, "Invalid node type"));
227 if (controller != null) {
228 ISwitch sw = controller.getSwitch((Long) node.getID());
230 FlowConverter x = new FlowConverter(flow);
231 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
236 * Synchronous message send. Each message is followed by a
239 result = sw.syncSend(msg);
242 * Message will be sent asynchronously. A Barrier message
243 * will be inserted automatically to synchronize the
246 result = asyncMsgSend(node, sw, msg, rid);
248 return getStatusInternal(result, action, rid);
250 return new Status(StatusCode.GONE, errorString("send", action,
251 "Switch is not available"));
254 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
255 "Internal plugin error"));
258 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
259 String action = "modify";
260 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
261 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
262 action, "Invalid node type"));
264 if (controller != null) {
265 ISwitch sw = controller.getSwitch((Long) node.getID());
267 OFMessage msg1 = null, msg2 = null;
269 // If priority and match portion are the same, send a
270 // modification message
271 if (oldFlow.getPriority() != newFlow.getPriority()
272 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
273 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
274 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
275 msg2 = new FlowConverter(newFlow).getOFFlowMod(
276 OFFlowMod.OFPFC_ADD, null);
278 msg1 = new FlowConverter(newFlow).getOFFlowMod(
279 OFFlowMod.OFPFC_MODIFY_STRICT, null);
282 * Synchronous message send
284 action = (msg2 == null) ? "modify" : "delete";
288 * Synchronous message send. Each message is followed by a
291 result = sw.syncSend(msg1);
294 * Message will be sent asynchronously. A Barrier message
295 * will be inserted automatically to synchronize the
298 result = asyncMsgSend(node, sw, msg1, rid);
301 Status rv = getStatusInternal(result, action, rid);
302 if ((msg2 == null) || !rv.isSuccess()) {
309 * Synchronous message send. Each message is followed by a
312 result = sw.syncSend(msg2);
315 * Message will be sent asynchronously. A Barrier message
316 * will be inserted automatically to synchronize the
319 result = asyncMsgSend(node, sw, msg2, rid);
321 return getStatusInternal(result, action, rid);
323 return new Status(StatusCode.GONE, errorString("send", action,
324 "Switch is not available"));
327 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
328 "Internal plugin error"));
331 private Status removeFlowInternal(Node node, Flow flow, long rid) {
332 String action = "remove";
333 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
334 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
335 action, "Invalid node type"));
337 if (controller != null) {
338 ISwitch sw = controller.getSwitch((Long) node.getID());
340 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
341 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
345 * Synchronous message send. Each message is followed by a
348 result = sw.syncSend(msg);
351 * Message will be sent asynchronously. A Barrier message
352 * will be inserted automatically to synchronize the
355 result = asyncMsgSend(node, sw, msg, rid);
357 return getStatusInternal(result, action, rid);
359 return new Status(StatusCode.GONE, errorString("send", action,
360 "Switch is not available"));
363 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
364 "Internal plugin error"));
368 public Status removeAllFlows(Node node) {
369 if (!connectionOutService.isLocal(node)) {
370 log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
371 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
374 return new Status(StatusCode.SUCCESS);
377 private String errorString(String phase, String action, String cause) {
379 + ((phase != null) ? phase + " the " + action
380 + " flow message: " : action + " the flow: ") + cause;
384 public void receive(ISwitch sw, OFMessage msg) {
385 if (msg instanceof OFFlowRemoved) {
386 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
387 } else if (msg instanceof OFError) {
388 handleErrorMessage(sw, (OFError) msg);
392 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
393 Node node = NodeCreator.createOFNode(sw.getId());
394 Flow flow = new FlowConverter(msg.getMatch(),
395 new ArrayList<OFAction>(0)).getFlow(node);
396 flow.setPriority(msg.getPriority());
397 flow.setIdleTimeout(msg.getIdleTimeout());
398 flow.setId(msg.getCookie());
400 Match match = flow.getMatch();
401 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
402 .getField(MatchType.IN_PORT).getValue() : null;
404 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
406 String container = containerNotifier.getKey();
407 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
409 * Switch only provide us with the match information. For now let's
410 * try to identify the container membership only from the input port
411 * match field. In any case, upper layer consumers can derive
412 * whether the notification was not for them. More sophisticated
413 * filtering can be added later on.
416 || container.equals(GlobalConstants.DEFAULT.toString())
417 || this.containerToNc.get(container).contains(inPort)) {
418 notifier.flowRemoved(node, flow);
423 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
424 Node node = NodeCreator.createOFNode(sw.getId());
425 OFMessage offendingMsg = errorMsg.getOffendingMsg();
427 if (offendingMsg != null) {
428 xid = offendingMsg.getXid();
430 xid = errorMsg.getXid();
433 Long rid = getMessageRid(sw.getId(), xid);
435 * Null or zero requestId indicates that the error message is meant for
436 * a sync message. It will be handled by the sync message worker thread.
437 * Hence we are done here.
439 if ((rid == null) || (rid == 0)) {
444 * Notifies the caller that error has been reported for a previous flow
445 * programming request
447 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
449 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
450 notifier.flowErrorReported(node, rid, errorMsg);
455 public void tagUpdated(String containerName, Node n, short oldTag,
456 short newTag, UpdateType t) {
461 public void containerFlowUpdated(String containerName,
462 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
466 public void nodeConnectorUpdated(String containerName, NodeConnector p,
468 Set<NodeConnector> target = null;
472 if (!containerToNc.containsKey(containerName)) {
473 containerToNc.put(containerName, new HashSet<NodeConnector>());
475 containerToNc.get(containerName).add(p);
480 target = containerToNc.get(containerName);
481 if (target != null) {
490 public void containerModeUpdated(UpdateType t) {
495 public Status syncSendBarrierMessage(Node node) {
496 if (!connectionOutService.isLocal(node)) {
497 log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
498 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
501 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
502 return new Status(StatusCode.NOTACCEPTABLE,
503 "The node does not support Barrier message.");
506 if (controller != null) {
507 long swid = (Long) node.getID();
508 ISwitch sw = controller.getSwitch(swid);
510 sw.syncSendBarrierMessage();
512 return (new Status(StatusCode.SUCCESS));
514 return new Status(StatusCode.GONE,
515 "The node does not have a valid Switch reference.");
518 return new Status(StatusCode.INTERNALERROR,
519 "Failed to send Barrier message.");
523 public Status asyncSendBarrierMessage(Node node) {
524 if (!connectionOutService.isLocal(node)) {
525 log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
526 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
529 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
530 return new Status(StatusCode.NOTACCEPTABLE,
531 "The node does not support Barrier message.");
534 if (controller != null) {
535 long swid = (Long) node.getID();
536 ISwitch sw = controller.getSwitch(swid);
538 sw.asyncSendBarrierMessage();
540 return (new Status(StatusCode.SUCCESS));
542 return new Status(StatusCode.GONE,
543 "The node does not have a valid Switch reference.");
546 return new Status(StatusCode.INTERNALERROR,
547 "Failed to send Barrier message.");
551 * This method sends the message asynchronously until the number of messages
552 * sent reaches a threshold. Then a Barrier message is sent automatically
553 * for sync purpose. An unique Request ID associated with the message is
554 * passed down by the caller. The Request ID will be returned to the caller
555 * when an error message is received from the switch.
562 * The OF message to be sent
567 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
568 Object result = Boolean.TRUE;
569 long swid = (Long) node.getID();
572 xid = sw.asyncSend(msg);
573 addXid2Rid(swid, xid, rid);
575 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
576 if (swxid2rid == null) {
580 int size = swxid2rid.size();
581 if (size % barrierMessagePriorCount == 0) {
582 result = asyncSendBarrierMessage(node);
589 * A number of async messages are sent followed by a synchronous Barrier
590 * message. This method returns the maximum async messages that can be sent
591 * before the Barrier message.
593 * @return The max count of async messages sent prior to Barrier message
595 private int getBarrierMessagePriorCount() {
596 String count = System.getProperty("of.barrierMessagePriorCount");
601 rv = Integer.parseInt(count);
602 } catch (Exception e) {
610 * This method returns the message Request ID previously assigned by the
611 * caller for a given OF message xid
617 * @return The Request ID
619 private Long getMessageRid(long swid, Integer xid) {
626 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
627 if (swxid2rid != null) {
628 rid = swxid2rid.get(xid);
634 * This method returns a copy of outstanding xid to rid mappings.for a given
639 * @return a copy of xid2rid mappings
641 public Map<Integer, Long> getSwXid2Rid(long swid) {
642 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
644 if (swxid2rid != null) {
645 return new HashMap<Integer, Long>(swxid2rid);
647 return new HashMap<Integer, Long>();
652 * Adds xid to rid mapping to the local DB
659 * The message Request ID
661 private void addXid2Rid(long swid, int xid, long rid) {
662 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
663 if (swxid2rid != null) {
664 swxid2rid.put(xid, rid);
669 * When an Error message is received, this method will be invoked to remove
670 * the offending xid from the local DB.
677 private void removeXid2Rid(long swid, int xid) {
678 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
679 if (swxid2rid != null) {
680 swxid2rid.remove(xid);
685 * Convert various result into Status
688 * The returned result from previous action
690 * add/modify/delete flow action
692 * The Request ID associated with the flow message
695 private Status getStatusInternal(Object result, String action, long rid) {
696 if (result instanceof Boolean) {
697 return ((Boolean) result == Boolean.TRUE) ? new Status(
698 StatusCode.SUCCESS, rid) : new Status(
699 StatusCode.TIMEOUT, errorString(null, action,
700 "Request Timed Out"));
701 } else if (result instanceof Status) {
702 return (Status) result;
703 } else if (result instanceof OFError) {
704 OFError res = (OFError) result;
705 return new Status(StatusCode.INTERNALERROR, errorString(
706 "program", action, Utils.getOFErrorString(res)));
708 return new Status(StatusCode.INTERNALERROR, errorString(
709 "send", action, "Internal Error"));
714 * When a Barrier reply is received, this method will be invoked to clear
720 private void clearXid2Rid(long swid) {
721 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
722 if (swxid2rid != null) {
728 public void updateNode(Node node, UpdateType type, Set<Property> props) {
729 long swid = (Long)node.getID();
733 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
734 this.xid2rid.put(swid, swxid2rid);
739 this.xid2rid.remove(swid);
746 public void updateNodeConnector(NodeConnector nodeConnector,
747 UpdateType type, Set<Property> props) {
750 private void registerWithOSGIConsole() {
751 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
753 bundleContext.registerService(CommandProvider.class.getName(), this,
758 public String getHelp() {
759 StringBuffer help = new StringBuffer();
760 help.append("-- Flow Programmer Service --\n");
761 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
762 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
763 return help.toString();
766 public void _px2r(CommandInterpreter ci) {
767 String st = ci.nextArgument();
769 ci.println("Please enter a valid node id");
775 sid = HexEncode.stringToLong(st);
776 } catch (NumberFormatException e) {
777 ci.println("Please enter a valid node id");
781 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
782 if (swxid2rid == null) {
783 ci.println("The node id entered does not exist");
787 ci.println("xid rid");
789 Set<Integer> xidSet = swxid2rid.keySet();
790 if (xidSet == null) {
794 for (Integer xid : xidSet) {
795 ci.println(xid + " " + swxid2rid.get(xid));
799 public void _px2rc(CommandInterpreter ci) {
800 ci.println("Max num of async messages sent prior to the Barrier message is "
801 + barrierMessagePriorCount);