2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.core.ContainerFlow;
27 import org.opendaylight.controller.sal.core.IContainerListener;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Node.NodeIDType;
30 import org.opendaylight.controller.sal.core.NodeConnector;
31 import org.opendaylight.controller.sal.core.Property;
32 import org.opendaylight.controller.sal.core.UpdateType;
33 import org.opendaylight.controller.sal.flowprogrammer.Flow;
34 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.match.MatchType;
37 import org.opendaylight.controller.sal.utils.GlobalConstants;
38 import org.opendaylight.controller.sal.utils.HexEncode;
39 import org.opendaylight.controller.sal.utils.NodeCreator;
40 import org.opendaylight.controller.sal.utils.Status;
41 import org.opendaylight.controller.sal.utils.StatusCode;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFFlowRemoved;
45 import org.openflow.protocol.OFMessage;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFType;
48 import org.openflow.protocol.action.OFAction;
49 import org.openflow.protocol.factory.MessageParseException;
50 import org.osgi.framework.BundleContext;
51 import org.osgi.framework.FrameworkUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Represents the openflow plugin component in charge of programming the flows
57 * the flow programming and relay them to functional modules above SAL.
59 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
60 IMessageListener, IContainerListener, IInventoryShimExternalListener,
62 private static final Logger log = LoggerFactory
63 .getLogger(FlowProgrammerService.class);
64 private IController controller;
65 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
66 private Map<String, Set<NodeConnector>> containerToNc;
67 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
68 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
70 public FlowProgrammerService() {
72 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
73 containerToNc = new HashMap<String, Set<NodeConnector>>();
74 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
77 public void setController(IController core) {
78 this.controller = core;
81 public void unsetController(IController core) {
82 if (this.controller == core) {
83 this.controller = null;
87 public void setFlowProgrammerNotifier(Map<String, ?> props,
88 IFlowProgrammerNotifier s) {
89 if (props == null || props.get("containerName") == null) {
90 log.error("Didn't receive the service correct properties");
93 String containerName = (String) props.get("containerName");
94 this.flowProgrammerNotifiers.put(containerName, s);
97 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
98 IFlowProgrammerNotifier s) {
99 if (props == null || props.get("containerName") == null) {
100 log.error("Didn't receive the service correct properties");
103 String containerName = (String) props.get("containerName");
104 if (this.flowProgrammerNotifiers != null
105 && this.flowProgrammerNotifiers.containsKey(containerName)
106 && this.flowProgrammerNotifiers.get(containerName) == s) {
107 this.flowProgrammerNotifiers.remove(containerName);
112 * Function called by the dependency manager when all the required
113 * dependencies are satisfied
117 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
118 this.controller.addMessageListener(OFType.ERROR, this);
119 registerWithOSGIConsole();
123 * Function called by the dependency manager when at least one dependency
124 * become unsatisfied or when the component is shutting down because for
125 * example bundle is being stopped.
132 * Function called by dependency manager after "init ()" is called and after
133 * the services provided by the class are registered in the service registry
140 * Function called by the dependency manager before the services exported by
141 * the component are unregistered, this will be followed by a "destroy ()"
149 public Status addFlow(Node node, Flow flow) {
150 return addFlowInternal(node, flow, 0);
154 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
155 return modifyFlowInternal(node, oldFlow, newFlow, 0);
159 public Status removeFlow(Node node, Flow flow) {
160 return removeFlowInternal(node, flow, 0);
164 public Status addFlowAsync(Node node, Flow flow, long rid) {
165 return addFlowInternal(node, flow, rid);
169 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
171 return modifyFlowInternal(node, oldFlow, newFlow, rid);
175 public Status removeFlowAsync(Node node, Flow flow, long rid) {
176 return removeFlowInternal(node, flow, rid);
179 private Status addFlowInternal(Node node, Flow flow, long rid) {
180 String action = "add";
181 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
182 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
183 action, "Invalid node type"));
186 if (controller != null) {
187 ISwitch sw = controller.getSwitch((Long) node.getID());
189 FlowConverter x = new FlowConverter(flow);
190 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
195 * Synchronous message send. Each message is followed by a
198 result = sw.syncSend(msg);
200 log.debug("Sending flow asynchronously");
202 * Message will be sent asynchronously. A Barrier message
203 * will be inserted automatically to synchronize the
206 result = asyncMsgSend(node, sw, msg, rid);
208 return getStatusInternal(result, action, rid);
210 return new Status(StatusCode.GONE, errorString("send", action,
211 "Switch is not available"));
214 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
215 "Internal plugin error"));
218 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
219 String action = "modify";
220 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
221 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
222 action, "Invalid node type"));
224 if (controller != null) {
225 ISwitch sw = controller.getSwitch((Long) node.getID());
227 OFMessage msg1 = null, msg2 = null;
229 // If priority and match portion are the same, send a
230 // modification message
231 if (oldFlow.getPriority() != newFlow.getPriority()
232 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
233 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
234 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
235 msg2 = new FlowConverter(newFlow).getOFFlowMod(
236 OFFlowMod.OFPFC_ADD, null);
238 msg1 = new FlowConverter(newFlow).getOFFlowMod(
239 OFFlowMod.OFPFC_MODIFY_STRICT, null);
242 * Synchronous message send
244 action = (msg2 == null) ? "modify" : "delete";
248 * Synchronous message send. Each message is followed by a
251 result = sw.syncSend(msg1);
254 * Message will be sent asynchronously. A Barrier message
255 * will be inserted automatically to synchronize the
258 result = asyncMsgSend(node, sw, msg1, rid);
261 Status rv = getStatusInternal(result, action, rid);
262 if ((msg2 == null) || !rv.isSuccess()) {
269 * Synchronous message send. Each message is followed by a
272 result = sw.syncSend(msg2);
275 * Message will be sent asynchronously. A Barrier message
276 * will be inserted automatically to synchronize the
279 result = asyncMsgSend(node, sw, msg2, rid);
281 return getStatusInternal(result, action, rid);
283 return new Status(StatusCode.GONE, errorString("send", action,
284 "Switch is not available"));
287 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
288 "Internal plugin error"));
291 private Status removeFlowInternal(Node node, Flow flow, long rid) {
292 String action = "remove";
293 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
294 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
295 action, "Invalid node type"));
297 if (controller != null) {
298 ISwitch sw = controller.getSwitch((Long) node.getID());
300 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
301 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
305 * Synchronous message send. Each message is followed by a
308 result = sw.syncSend(msg);
311 * Message will be sent asynchronously. A Barrier message
312 * will be inserted automatically to synchronize the
315 result = asyncMsgSend(node, sw, msg, rid);
317 return getStatusInternal(result, action, rid);
319 return new Status(StatusCode.GONE, errorString("send", action,
320 "Switch is not available"));
323 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
324 "Internal plugin error"));
328 public Status removeAllFlows(Node node) {
329 return new Status(StatusCode.SUCCESS);
332 private String errorString(String phase, String action, String cause) {
334 + ((phase != null) ? phase + " the " + action
335 + " flow message: " : action + " the flow: ") + cause;
339 public void receive(ISwitch sw, OFMessage msg) {
340 if (msg instanceof OFFlowRemoved) {
341 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
342 } else if (msg instanceof OFError) {
343 handleErrorMessage(sw, (OFError) msg);
347 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
348 Node node = NodeCreator.createOFNode(sw.getId());
349 Flow flow = new FlowConverter(msg.getMatch(),
350 new ArrayList<OFAction>(0)).getFlow(node);
351 flow.setPriority(msg.getPriority());
352 flow.setIdleTimeout(msg.getIdleTimeout());
353 flow.setId(msg.getCookie());
355 Match match = flow.getMatch();
356 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
357 .getField(MatchType.IN_PORT).getValue() : null;
359 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
361 String container = containerNotifier.getKey();
362 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
364 * Switch only provide us with the match information. For now let's
365 * try to identify the container membership only from the input port
366 * match field. In any case, upper layer consumers can derive
367 * whether the notification was not for them. More sophisticated
368 * filtering can be added later on.
371 || container.equals(GlobalConstants.DEFAULT.toString())
372 || this.containerToNc.get(container).contains(inPort)) {
373 notifier.flowRemoved(node, flow);
378 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
379 Node node = NodeCreator.createOFNode(sw.getId());
380 OFMessage offendingMsg = null;
382 offendingMsg = errorMsg.getOffendingMsg();
383 } catch (MessageParseException e) {
384 // TODO Auto-generated catch block
388 if (offendingMsg != null) {
389 xid = offendingMsg.getXid();
391 xid = errorMsg.getXid();
394 Long rid = getMessageRid(sw.getId(), xid);
396 * Null or zero requestId indicates that the error message is meant for
397 * a sync message. It will be handled by the sync message worker thread.
398 * Hence we are done here.
400 if ((rid == null) || (rid == 0)) {
405 * Notifies the caller that error has been reported for a previous flow
406 * programming request
408 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
410 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
411 notifier.flowErrorReported(node, rid, errorMsg);
416 public void tagUpdated(String containerName, Node n, short oldTag,
417 short newTag, UpdateType t) {
422 public void containerFlowUpdated(String containerName,
423 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
427 public void nodeConnectorUpdated(String containerName, NodeConnector p,
429 Set<NodeConnector> target = null;
433 if (!containerToNc.containsKey(containerName)) {
434 containerToNc.put(containerName, new HashSet<NodeConnector>());
436 containerToNc.get(containerName).add(p);
441 target = containerToNc.get(containerName);
442 if (target != null) {
451 public void containerModeUpdated(UpdateType t) {
456 public Status syncSendBarrierMessage(Node node) {
457 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
458 return new Status(StatusCode.NOTACCEPTABLE,
459 "The node does not support Barrier message.");
462 if (controller != null) {
463 long swid = (Long) node.getID();
464 ISwitch sw = controller.getSwitch(swid);
466 sw.syncSendBarrierMessage();
468 return (new Status(StatusCode.SUCCESS));
470 return new Status(StatusCode.GONE,
471 "The node does not have a valid Switch reference.");
474 return new Status(StatusCode.INTERNALERROR,
475 "Failed to send Barrier message.");
479 public Status asyncSendBarrierMessage(Node node) {
480 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
481 return new Status(StatusCode.NOTACCEPTABLE,
482 "The node does not support Barrier message.");
485 if (controller != null) {
486 long swid = (Long) node.getID();
487 ISwitch sw = controller.getSwitch(swid);
489 sw.asyncSendBarrierMessage();
491 return (new Status(StatusCode.SUCCESS));
493 return new Status(StatusCode.GONE,
494 "The node does not have a valid Switch reference.");
497 return new Status(StatusCode.INTERNALERROR,
498 "Failed to send Barrier message.");
502 * This method sends the message asynchronously until the number of messages
503 * sent reaches a threshold. Then a Barrier message is sent automatically
504 * for sync purpose. An unique Request ID associated with the message is
505 * passed down by the caller. The Request ID will be returned to the caller
506 * when an error message is received from the switch.
513 * The OF message to be sent
518 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
519 Object result = Boolean.TRUE;
520 long swid = (Long) node.getID();
523 xid = sw.asyncSend(msg);
524 addXid2Rid(swid, xid, rid);
526 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
527 if (swxid2rid == null) {
531 int size = swxid2rid.size();
532 if (size % barrierMessagePriorCount == 0) {
533 result = asyncSendBarrierMessage(node);
540 * A number of async messages are sent followed by a synchronous Barrier
541 * message. This method returns the maximum async messages that can be sent
542 * before the Barrier message.
544 * @return The max count of async messages sent prior to Barrier message
546 private int getBarrierMessagePriorCount() {
547 String count = System.getProperty("of.barrierMessagePriorCount");
552 rv = Integer.parseInt(count);
553 } catch (Exception e) {
561 * This method returns the message Request ID previously assigned by the
562 * caller for a given OF message xid
568 * @return The Request ID
570 private Long getMessageRid(long swid, Integer xid) {
577 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
578 if (swxid2rid != null) {
579 rid = swxid2rid.get(xid);
585 * This method returns a copy of outstanding xid to rid mappings.for a given
590 * @return a copy of xid2rid mappings
592 public Map<Integer, Long> getSwXid2Rid(long swid) {
593 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
595 if (swxid2rid != null) {
596 return new HashMap<Integer, Long>(swxid2rid);
598 return new HashMap<Integer, Long>();
603 * Adds xid to rid mapping to the local DB
610 * The message Request ID
612 private void addXid2Rid(long swid, int xid, long rid) {
613 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
614 if (swxid2rid != null) {
615 swxid2rid.put(xid, rid);
620 * When an Error message is received, this method will be invoked to remove
621 * the offending xid from the local DB.
628 private void removeXid2Rid(long swid, int xid) {
629 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
630 if (swxid2rid != null) {
631 swxid2rid.remove(xid);
636 * Convert various result into Status
639 * The returned result from previous action
641 * add/modify/delete flow action
643 * The Request ID associated with the flow message
646 private Status getStatusInternal(Object result, String action, long rid) {
647 if (result instanceof Boolean) {
648 return ((Boolean) result == Boolean.TRUE) ? new Status(
649 StatusCode.SUCCESS, rid) : new Status(
650 StatusCode.TIMEOUT, errorString(null, action,
651 "Request Timed Out"));
652 } else if (result instanceof Status) {
653 return (Status) result;
654 } else if (result instanceof OFError) {
655 OFError res = (OFError) result;
656 return new Status(StatusCode.INTERNALERROR, errorString(
657 "program", action, Utils.getOFErrorString(res)));
659 return new Status(StatusCode.INTERNALERROR, errorString(
660 "send", action, "Internal Error"));
665 * When a Barrier reply is received, this method will be invoked to clear
671 private void clearXid2Rid(long swid) {
672 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
673 if (swxid2rid != null) {
679 public void updateNode(Node node, UpdateType type, Set<Property> props) {
680 long swid = (Long)node.getID();
684 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
685 this.xid2rid.put(swid, swxid2rid);
690 this.xid2rid.remove(swid);
697 public void updateNodeConnector(NodeConnector nodeConnector,
698 UpdateType type, Set<Property> props) {
701 private void registerWithOSGIConsole() {
702 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
704 bundleContext.registerService(CommandProvider.class.getName(), this,
709 public String getHelp() {
710 StringBuffer help = new StringBuffer();
711 help.append("-- Flow Programmer Service --\n");
712 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
713 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
714 return help.toString();
717 public void _px2r(CommandInterpreter ci) {
718 String st = ci.nextArgument();
720 ci.println("Please enter a valid node id");
726 sid = HexEncode.stringToLong(st);
727 } catch (NumberFormatException e) {
728 ci.println("Please enter a valid node id");
732 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
733 if (swxid2rid == null) {
734 ci.println("The node id entered does not exist");
738 ci.println("xid rid");
740 Set<Integer> xidSet = swxid2rid.keySet();
741 if (xidSet == null) {
745 for (Integer xid : xidSet) {
746 ci.println(xid + " " + swxid2rid.get(xid));
750 public void _px2rc(CommandInterpreter ci) {
751 ci.println("Max num of async messages sent prior to the Barrier message is "
752 + barrierMessagePriorCount);