2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
27 import org.opendaylight.controller.sal.core.ContainerFlow;
28 import org.opendaylight.controller.sal.core.IContainerAware;
29 import org.opendaylight.controller.sal.core.IContainerListener;
30 import org.opendaylight.controller.sal.core.Node;
31 import org.opendaylight.controller.sal.core.Node.NodeIDType;
32 import org.opendaylight.controller.sal.core.NodeConnector;
33 import org.opendaylight.controller.sal.core.Property;
34 import org.opendaylight.controller.sal.core.UpdateType;
35 import org.opendaylight.controller.sal.flowprogrammer.Flow;
36 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
37 import org.opendaylight.controller.sal.match.Match;
38 import org.opendaylight.controller.sal.match.MatchType;
39 import org.opendaylight.controller.sal.utils.EtherTypes;
40 import org.opendaylight.controller.sal.utils.GlobalConstants;
41 import org.opendaylight.controller.sal.utils.HexEncode;
42 import org.opendaylight.controller.sal.utils.NodeCreator;
43 import org.opendaylight.controller.sal.utils.Status;
44 import org.opendaylight.controller.sal.utils.StatusCode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFFlowRemoved;
48 import org.openflow.protocol.OFMessage;
49 import org.openflow.protocol.OFPort;
50 import org.openflow.protocol.OFType;
51 import org.openflow.protocol.action.OFAction;
52 import org.osgi.framework.BundleContext;
53 import org.osgi.framework.FrameworkUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * Represents the openflow plugin component in charge of programming the flows
59 * the flow programming and relay them to functional modules above SAL.
61 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
62 IMessageListener, IContainerListener, IInventoryShimExternalListener,
63 CommandProvider, IContainerAware {
64 private static final Logger log = LoggerFactory
65 .getLogger(FlowProgrammerService.class);
66 private IController controller;
67 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
68 private Map<String, Set<NodeConnector>> containerToNc;
69 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
70 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
71 private IPluginOutConnectionService connectionOutService;
73 public FlowProgrammerService() {
75 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
76 containerToNc = new HashMap<String, Set<NodeConnector>>();
77 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
80 public void setController(IController core) {
81 this.controller = core;
84 public void unsetController(IController core) {
85 if (this.controller == core) {
86 this.controller = null;
90 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
91 connectionOutService = s;
94 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
95 if (connectionOutService == s) {
96 connectionOutService = null;
100 public void setFlowProgrammerNotifier(Map<String, ?> props,
101 IFlowProgrammerNotifier s) {
102 if (props == null || props.get("containerName") == null) {
103 log.error("Didn't receive the service correct properties");
106 String containerName = (String) props.get("containerName");
107 this.flowProgrammerNotifiers.put(containerName, s);
110 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
111 IFlowProgrammerNotifier s) {
112 if (props == null || props.get("containerName") == null) {
113 log.error("Didn't receive the service correct properties");
116 String containerName = (String) props.get("containerName");
117 if (this.flowProgrammerNotifiers != null
118 && this.flowProgrammerNotifiers.containsKey(containerName)
119 && this.flowProgrammerNotifiers.get(containerName) == s) {
120 this.flowProgrammerNotifiers.remove(containerName);
125 * Function called by the dependency manager when all the required
126 * dependencies are satisfied
130 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
131 this.controller.addMessageListener(OFType.ERROR, this);
132 registerWithOSGIConsole();
136 * Function called by the dependency manager when at least one dependency
137 * become unsatisfied or when the component is shutting down because for
138 * example bundle is being stopped.
145 * Function called by dependency manager after "init ()" is called and after
146 * the services provided by the class are registered in the service registry
153 * Function called by the dependency manager before the services exported by
154 * the component are unregistered, this will be followed by a "destroy ()"
162 public Status addFlow(Node node, Flow flow) {
163 if (!connectionOutService.isLocal(node)) {
164 log.debug("Add flow will not be processed in a non-master controller for node " + node);
165 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
168 return addFlowInternal(node, flow, 0);
172 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
173 if (!connectionOutService.isLocal(node)) {
174 log.debug("Modify flow will not be processed in a non-master controller for node " + node);
175 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
178 return modifyFlowInternal(node, oldFlow, newFlow, 0);
182 public Status removeFlow(Node node, Flow flow) {
183 if (!connectionOutService.isLocal(node)) {
184 log.debug("Remove flow will not be processed in a non-master controller for node " + node);
185 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
188 return removeFlowInternal(node, flow, 0);
192 public Status addFlowAsync(Node node, Flow flow, long rid) {
193 if (!connectionOutService.isLocal(node)) {
194 log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
195 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
198 return addFlowInternal(node, flow, rid);
202 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
204 if (!connectionOutService.isLocal(node)) {
205 log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
206 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
209 return modifyFlowInternal(node, oldFlow, newFlow, rid);
213 public Status removeFlowAsync(Node node, Flow flow, long rid) {
214 if (!connectionOutService.isLocal(node)) {
215 log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
216 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
219 return removeFlowInternal(node, flow, rid);
222 private Status addFlowInternal(Node node, Flow flow, long rid) {
223 String action = "add";
224 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
225 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
226 action, "Invalid node type"));
229 Status status = validateFlow(flow);
230 if (!status.isSuccess()) {
234 if (controller != null) {
235 ISwitch sw = controller.getSwitch((Long) node.getID());
237 FlowConverter x = new FlowConverter(flow);
238 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
243 * Synchronous message send. Each message is followed by a
246 result = sw.syncSend(msg);
249 * Message will be sent asynchronously. A Barrier message
250 * will be inserted automatically to synchronize the
253 result = asyncMsgSend(node, sw, msg, rid);
255 return getStatusInternal(result, action, rid);
257 return new Status(StatusCode.GONE, errorString("send", action,
258 "Switch is not available"));
261 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
262 "Internal plugin error"));
266 * Method which runs openflow 1.0 specific validation on the requested flow
267 * This validation is needed because the openflow switch will silently accept
268 * the request and install only the applicable match fields
270 private Status validateFlow(Flow flow) {
271 Match m = flow.getMatch();
272 boolean isIPEthertypeSet = m.isPresent(MatchType.DL_TYPE)
273 && (m.getField(MatchType.DL_TYPE).getValue().equals(EtherTypes.IPv4.shortValue()) || m
274 .getField(MatchType.DL_TYPE).getValue().equals(EtherTypes.IPv6.shortValue()));
276 // network address check
277 if ((m.isPresent(MatchType.NW_SRC) || m.isPresent(MatchType.NW_DST)) && !isIPEthertypeSet) {
278 return new Status(StatusCode.NOTACCEPTABLE,
279 "The match on network source or destination address cannot be accepted if the match "
280 + "on proper ethertype is missing");
283 // transport protocol check
284 if (m.isPresent(MatchType.NW_PROTO) && !isIPEthertypeSet) {
285 return new Status(StatusCode.NOTACCEPTABLE,
286 "The match on network protocol cannot be accepted if the match on proper ethertype is missing");
289 // transport ports check
290 if ((m.isPresent(MatchType.TP_SRC) || m.isPresent(MatchType.TP_DST))
291 && (!isIPEthertypeSet || m.isAny(MatchType.NW_PROTO))) {
293 StatusCode.NOTACCEPTABLE,
294 "The match on transport source or destination port cannot be accepted if the match on network protocol and match on IP ethertype are missing");
296 return new Status(StatusCode.SUCCESS);
299 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
300 String action = "modify";
301 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
302 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
303 action, "Invalid node type"));
306 Status status = validateFlow(newFlow);
307 if (!status.isSuccess()) {
311 if (controller != null) {
312 ISwitch sw = controller.getSwitch((Long) node.getID());
314 OFMessage msg1 = null, msg2 = null;
316 // If priority and match portion are the same, send a
317 // modification message
318 if (oldFlow.getPriority() != newFlow.getPriority()
319 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
320 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
321 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
322 msg2 = new FlowConverter(newFlow).getOFFlowMod(
323 OFFlowMod.OFPFC_ADD, null);
325 msg1 = new FlowConverter(newFlow).getOFFlowMod(
326 OFFlowMod.OFPFC_MODIFY_STRICT, null);
329 * Synchronous message send
331 action = (msg2 == null) ? "modify" : "delete";
335 * Synchronous message send. Each message is followed by a
338 result = sw.syncSend(msg1);
341 * Message will be sent asynchronously. A Barrier message
342 * will be inserted automatically to synchronize the
345 result = asyncMsgSend(node, sw, msg1, rid);
348 Status rv = getStatusInternal(result, action, rid);
349 if ((msg2 == null) || !rv.isSuccess()) {
356 * Synchronous message send. Each message is followed by a
359 result = sw.syncSend(msg2);
362 * Message will be sent asynchronously. A Barrier message
363 * will be inserted automatically to synchronize the
366 result = asyncMsgSend(node, sw, msg2, rid);
368 return getStatusInternal(result, action, rid);
370 return new Status(StatusCode.GONE, errorString("send", action,
371 "Switch is not available"));
374 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
375 "Internal plugin error"));
378 private Status removeFlowInternal(Node node, Flow flow, long rid) {
379 String action = "remove";
380 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
381 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
382 action, "Invalid node type"));
384 if (controller != null) {
385 ISwitch sw = controller.getSwitch((Long) node.getID());
387 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
388 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
392 * Synchronous message send. Each message is followed by a
395 result = sw.syncSend(msg);
398 * Message will be sent asynchronously. A Barrier message
399 * will be inserted automatically to synchronize the
402 result = asyncMsgSend(node, sw, msg, rid);
404 return getStatusInternal(result, action, rid);
406 return new Status(StatusCode.GONE, errorString("send", action,
407 "Switch is not available"));
410 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
411 "Internal plugin error"));
415 public Status removeAllFlows(Node node) {
416 if (!connectionOutService.isLocal(node)) {
417 log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
418 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
421 return new Status(StatusCode.SUCCESS);
424 private String errorString(String phase, String action, String cause) {
426 + ((phase != null) ? phase + " the " + action
427 + " flow message: " : action + " the flow: ") + cause;
431 public void receive(ISwitch sw, OFMessage msg) {
432 if (msg instanceof OFFlowRemoved) {
433 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
434 } else if (msg instanceof OFError) {
435 handleErrorMessage(sw, (OFError) msg);
439 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
440 Node node = NodeCreator.createOFNode(sw.getId());
441 Flow flow = new FlowConverter(msg.getMatch(),
442 new ArrayList<OFAction>(0)).getFlow(node);
443 flow.setPriority(msg.getPriority());
444 flow.setIdleTimeout(msg.getIdleTimeout());
445 flow.setId(msg.getCookie());
447 Match match = flow.getMatch();
448 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
449 .getField(MatchType.IN_PORT).getValue() : null;
451 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
453 String container = containerNotifier.getKey();
454 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
456 * Switch only provide us with the match information. For now let's
457 * try to identify the container membership only from the input port
458 * match field. In any case, upper layer consumers can derive
459 * whether the notification was not for them. More sophisticated
460 * filtering can be added later on.
463 || container.equals(GlobalConstants.DEFAULT.toString())
464 || (containerToNc.containsKey(container) && containerToNc.get(container).contains(inPort))) {
465 notifier.flowRemoved(node, flow);
470 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
471 Node node = NodeCreator.createOFNode(sw.getId());
472 OFMessage offendingMsg = errorMsg.getOffendingMsg();
474 if (offendingMsg != null) {
475 xid = offendingMsg.getXid();
477 xid = errorMsg.getXid();
480 Long rid = getMessageRid(sw.getId(), xid);
482 * Null or zero requestId indicates that the error message is meant for
483 * a sync message. It will be handled by the sync message worker thread.
484 * Hence we are done here.
486 if ((rid == null) || (rid == 0)) {
491 * Notifies the caller that error has been reported for a previous flow
492 * programming request
494 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
496 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
497 notifier.flowErrorReported(node, rid, Utils.getOFErrorString(errorMsg));
502 public void tagUpdated(String containerName, Node n, short oldTag,
503 short newTag, UpdateType t) {
508 public void containerFlowUpdated(String containerName,
509 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
513 public void nodeConnectorUpdated(String containerName, NodeConnector p,
517 if (!containerToNc.containsKey(containerName)) {
518 containerToNc.put(containerName, new HashSet<NodeConnector>());
520 containerToNc.get(containerName).add(p);
525 Set<NodeConnector> target = containerToNc.get(containerName);
526 if (target != null) {
535 public void containerModeUpdated(UpdateType t) {
540 public Status syncSendBarrierMessage(Node node) {
541 if (!connectionOutService.isLocal(node)) {
542 log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
543 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
546 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
547 return new Status(StatusCode.NOTACCEPTABLE,
548 "The node does not support Barrier message.");
551 if (controller != null) {
552 long swid = (Long) node.getID();
553 ISwitch sw = controller.getSwitch(swid);
555 sw.syncSendBarrierMessage();
557 return (new Status(StatusCode.SUCCESS));
559 return new Status(StatusCode.GONE,
560 "The node does not have a valid Switch reference.");
563 return new Status(StatusCode.INTERNALERROR,
564 "Failed to send Barrier message.");
568 public Status asyncSendBarrierMessage(Node node) {
569 if (!connectionOutService.isLocal(node)) {
570 log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
571 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
574 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
575 return new Status(StatusCode.NOTACCEPTABLE,
576 "The node does not support Barrier message.");
579 if (controller != null) {
580 long swid = (Long) node.getID();
581 ISwitch sw = controller.getSwitch(swid);
583 sw.asyncSendBarrierMessage();
585 return (new Status(StatusCode.SUCCESS));
587 return new Status(StatusCode.GONE,
588 "The node does not have a valid Switch reference.");
591 return new Status(StatusCode.INTERNALERROR,
592 "Failed to send Barrier message.");
596 * This method sends the message asynchronously until the number of messages
597 * sent reaches a threshold. Then a Barrier message is sent automatically
598 * for sync purpose. An unique Request ID associated with the message is
599 * passed down by the caller. The Request ID will be returned to the caller
600 * when an error message is received from the switch.
607 * The OF message to be sent
612 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
613 Object result = Boolean.TRUE;
614 long swid = (Long) node.getID();
617 xid = sw.asyncSend(msg);
618 addXid2Rid(swid, xid, rid);
620 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
621 if (swxid2rid == null) {
625 int size = swxid2rid.size();
626 if (size % barrierMessagePriorCount == 0) {
627 result = asyncSendBarrierMessage(node);
634 * A number of async messages are sent followed by a synchronous Barrier
635 * message. This method returns the maximum async messages that can be sent
636 * before the Barrier message.
638 * @return The max count of async messages sent prior to Barrier message
640 private int getBarrierMessagePriorCount() {
641 String count = System.getProperty("of.barrierMessagePriorCount");
646 rv = Integer.parseInt(count);
647 } catch (Exception e) {
655 * This method returns the message Request ID previously assigned by the
656 * caller for a given OF message xid
662 * @return The Request ID
664 private Long getMessageRid(long swid, Integer xid) {
671 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
672 if (swxid2rid != null) {
673 rid = swxid2rid.get(xid);
679 * This method returns a copy of outstanding xid to rid mappings.for a given
684 * @return a copy of xid2rid mappings
686 public Map<Integer, Long> getSwXid2Rid(long swid) {
687 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
689 if (swxid2rid != null) {
690 return new HashMap<Integer, Long>(swxid2rid);
692 return new HashMap<Integer, Long>();
697 * Adds xid to rid mapping to the local DB
704 * The message Request ID
706 private void addXid2Rid(long swid, int xid, long rid) {
707 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
708 if (swxid2rid != null) {
709 swxid2rid.put(xid, rid);
714 * When an Error message is received, this method will be invoked to remove
715 * the offending xid from the local DB.
722 private void removeXid2Rid(long swid, int xid) {
723 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
724 if (swxid2rid != null) {
725 swxid2rid.remove(xid);
730 * Convert various result into Status
733 * The returned result from previous action
735 * add/modify/delete flow action
737 * The Request ID associated with the flow message
740 private Status getStatusInternal(Object result, String action, long rid) {
741 if (result instanceof Boolean) {
742 return ((Boolean) result == Boolean.TRUE) ? new Status(
743 StatusCode.SUCCESS, rid) : new Status(
744 StatusCode.TIMEOUT, errorString(null, action,
745 "Request Timed Out"));
746 } else if (result instanceof Status) {
747 return (Status) result;
748 } else if (result instanceof OFError) {
749 OFError res = (OFError) result;
750 return new Status(StatusCode.INTERNALERROR, errorString(
751 "program", action, Utils.getOFErrorString(res)));
753 return new Status(StatusCode.INTERNALERROR, errorString(
754 "send", action, "Internal Error"));
759 * When a Barrier reply is received, this method will be invoked to clear
765 private void clearXid2Rid(long swid) {
766 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
767 if (swxid2rid != null) {
773 public void updateNode(Node node, UpdateType type, Set<Property> props) {
774 long swid = (Long)node.getID();
778 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
779 this.xid2rid.put(swid, swxid2rid);
784 this.xid2rid.remove(swid);
791 public void updateNodeConnector(NodeConnector nodeConnector,
792 UpdateType type, Set<Property> props) {
795 private void registerWithOSGIConsole() {
796 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
798 bundleContext.registerService(CommandProvider.class.getName(), this,
803 public String getHelp() {
804 StringBuffer help = new StringBuffer();
805 help.append("-- Flow Programmer Service --\n");
806 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
807 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
808 return help.toString();
811 public void _px2r(CommandInterpreter ci) {
812 String st = ci.nextArgument();
814 ci.println("Please enter a valid node id");
820 sid = HexEncode.stringToLong(st);
821 } catch (NumberFormatException e) {
822 ci.println("Please enter a valid node id");
826 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
827 if (swxid2rid == null) {
828 ci.println("The node id entered does not exist");
832 ci.println("xid rid");
834 Set<Integer> xidSet = swxid2rid.keySet();
835 if (xidSet == null) {
839 for (Integer xid : xidSet) {
840 ci.println(xid + " " + swxid2rid.get(xid));
844 public void _px2rc(CommandInterpreter ci) {
845 ci.println("Max num of async messages sent prior to the Barrier message is "
846 + barrierMessagePriorCount);
850 public void containerCreate(String containerName) {
855 public void containerDestroy(String containerName) {
856 containerToNc.remove(containerName);