2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.core.ContainerFlow;
27 import org.opendaylight.controller.sal.core.IContainerListener;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Node.NodeIDType;
30 import org.opendaylight.controller.sal.core.NodeConnector;
31 import org.opendaylight.controller.sal.core.Property;
32 import org.opendaylight.controller.sal.core.UpdateType;
33 import org.opendaylight.controller.sal.flowprogrammer.Flow;
34 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.match.MatchType;
37 import org.opendaylight.controller.sal.utils.GlobalConstants;
38 import org.opendaylight.controller.sal.utils.HexEncode;
39 import org.opendaylight.controller.sal.utils.NodeCreator;
40 import org.opendaylight.controller.sal.utils.Status;
41 import org.opendaylight.controller.sal.utils.StatusCode;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFFlowRemoved;
45 import org.openflow.protocol.OFMessage;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFType;
48 import org.openflow.protocol.action.OFAction;
49 import org.osgi.framework.BundleContext;
50 import org.osgi.framework.FrameworkUtil;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Represents the openflow plugin component in charge of programming the flows
56 * the flow programming and relay them to functional modules above SAL.
58 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
59 IMessageListener, IContainerListener, IInventoryShimExternalListener,
61 private static final Logger log = LoggerFactory
62 .getLogger(FlowProgrammerService.class);
63 private IController controller;
64 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
65 private Map<String, Set<NodeConnector>> containerToNc;
66 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
67 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
69 public FlowProgrammerService() {
71 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
72 containerToNc = new HashMap<String, Set<NodeConnector>>();
73 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
76 public void setController(IController core) {
77 this.controller = core;
80 public void unsetController(IController core) {
81 if (this.controller == core) {
82 this.controller = null;
86 public void setFlowProgrammerNotifier(Map<String, ?> props,
87 IFlowProgrammerNotifier s) {
88 if (props == null || props.get("containerName") == null) {
89 log.error("Didn't receive the service correct properties");
92 String containerName = (String) props.get("containerName");
93 this.flowProgrammerNotifiers.put(containerName, s);
96 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
97 IFlowProgrammerNotifier s) {
98 if (props == null || props.get("containerName") == null) {
99 log.error("Didn't receive the service correct properties");
102 String containerName = (String) props.get("containerName");
103 if (this.flowProgrammerNotifiers != null
104 && this.flowProgrammerNotifiers.containsKey(containerName)
105 && this.flowProgrammerNotifiers.get(containerName) == s) {
106 this.flowProgrammerNotifiers.remove(containerName);
111 * Function called by the dependency manager when all the required
112 * dependencies are satisfied
116 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
117 this.controller.addMessageListener(OFType.ERROR, this);
118 registerWithOSGIConsole();
122 * Function called by the dependency manager when at least one dependency
123 * become unsatisfied or when the component is shutting down because for
124 * example bundle is being stopped.
131 * Function called by dependency manager after "init ()" is called and after
132 * the services provided by the class are registered in the service registry
139 * Function called by the dependency manager before the services exported by
140 * the component are unregistered, this will be followed by a "destroy ()"
148 public Status addFlow(Node node, Flow flow) {
149 return addFlowInternal(node, flow, 0);
153 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
154 return modifyFlowInternal(node, oldFlow, newFlow, 0);
158 public Status removeFlow(Node node, Flow flow) {
159 return removeFlowInternal(node, flow, 0);
163 public Status addFlowAsync(Node node, Flow flow, long rid) {
164 return addFlowInternal(node, flow, rid);
168 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
170 return modifyFlowInternal(node, oldFlow, newFlow, rid);
174 public Status removeFlowAsync(Node node, Flow flow, long rid) {
175 return removeFlowInternal(node, flow, rid);
178 private Status addFlowInternal(Node node, Flow flow, long rid) {
179 String action = "add";
180 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
181 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
182 action, "Invalid node type"));
185 if (controller != null) {
186 ISwitch sw = controller.getSwitch((Long) node.getID());
188 FlowConverter x = new FlowConverter(flow);
189 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
194 * Synchronous message send. Each message is followed by a
197 result = sw.syncSend(msg);
200 * Message will be sent asynchronously. A Barrier message
201 * will be inserted automatically to synchronize the
204 result = asyncMsgSend(node, sw, msg, rid);
206 return getStatusInternal(result, action, rid);
208 return new Status(StatusCode.GONE, errorString("send", action,
209 "Switch is not available"));
212 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
213 "Internal plugin error"));
216 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
217 String action = "modify";
218 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
219 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
220 action, "Invalid node type"));
222 if (controller != null) {
223 ISwitch sw = controller.getSwitch((Long) node.getID());
225 OFMessage msg1 = null, msg2 = null;
227 // If priority and match portion are the same, send a
228 // modification message
229 if (oldFlow.getPriority() != newFlow.getPriority()
230 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
231 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
232 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
233 msg2 = new FlowConverter(newFlow).getOFFlowMod(
234 OFFlowMod.OFPFC_ADD, null);
236 msg1 = new FlowConverter(newFlow).getOFFlowMod(
237 OFFlowMod.OFPFC_MODIFY_STRICT, null);
240 * Synchronous message send
242 action = (msg2 == null) ? "modify" : "delete";
246 * Synchronous message send. Each message is followed by a
249 result = sw.syncSend(msg1);
252 * Message will be sent asynchronously. A Barrier message
253 * will be inserted automatically to synchronize the
256 result = asyncMsgSend(node, sw, msg1, rid);
259 Status rv = getStatusInternal(result, action, rid);
260 if ((msg2 == null) || !rv.isSuccess()) {
267 * Synchronous message send. Each message is followed by a
270 result = sw.syncSend(msg2);
273 * Message will be sent asynchronously. A Barrier message
274 * will be inserted automatically to synchronize the
277 result = asyncMsgSend(node, sw, msg2, rid);
279 return getStatusInternal(result, action, rid);
281 return new Status(StatusCode.GONE, errorString("send", action,
282 "Switch is not available"));
285 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
286 "Internal plugin error"));
289 private Status removeFlowInternal(Node node, Flow flow, long rid) {
290 String action = "remove";
291 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
292 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
293 action, "Invalid node type"));
295 if (controller != null) {
296 ISwitch sw = controller.getSwitch((Long) node.getID());
298 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
299 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
303 * Synchronous message send. Each message is followed by a
306 result = sw.syncSend(msg);
309 * Message will be sent asynchronously. A Barrier message
310 * will be inserted automatically to synchronize the
313 result = asyncMsgSend(node, sw, msg, rid);
315 return getStatusInternal(result, action, rid);
317 return new Status(StatusCode.GONE, errorString("send", action,
318 "Switch is not available"));
321 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
322 "Internal plugin error"));
326 public Status removeAllFlows(Node node) {
327 return new Status(StatusCode.SUCCESS);
330 private String errorString(String phase, String action, String cause) {
332 + ((phase != null) ? phase + " the " + action
333 + " flow message: " : action + " the flow: ") + cause;
337 public void receive(ISwitch sw, OFMessage msg) {
338 if (msg instanceof OFFlowRemoved) {
339 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
340 } else if (msg instanceof OFError) {
341 handleErrorMessage(sw, (OFError) msg);
345 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
346 Node node = NodeCreator.createOFNode(sw.getId());
347 Flow flow = new FlowConverter(msg.getMatch(),
348 new ArrayList<OFAction>(0)).getFlow(node);
349 flow.setPriority(msg.getPriority());
350 flow.setIdleTimeout(msg.getIdleTimeout());
351 flow.setId(msg.getCookie());
353 Match match = flow.getMatch();
354 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
355 .getField(MatchType.IN_PORT).getValue() : null;
357 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
359 String container = containerNotifier.getKey();
360 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
362 * Switch only provide us with the match information. For now let's
363 * try to identify the container membership only from the input port
364 * match field. In any case, upper layer consumers can derive
365 * whether the notification was not for them. More sophisticated
366 * filtering can be added later on.
369 || container.equals(GlobalConstants.DEFAULT.toString())
370 || this.containerToNc.get(container).contains(inPort)) {
371 notifier.flowRemoved(node, flow);
376 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
377 Node node = NodeCreator.createOFNode(sw.getId());
378 OFMessage offendingMsg = errorMsg.getOffendingMsg();
380 if (offendingMsg != null) {
381 xid = offendingMsg.getXid();
383 xid = errorMsg.getXid();
386 Long rid = getMessageRid(sw.getId(), xid);
388 * Null or zero requestId indicates that the error message is meant for
389 * a sync message. It will be handled by the sync message worker thread.
390 * Hence we are done here.
392 if ((rid == null) || (rid == 0)) {
397 * Notifies the caller that error has been reported for a previous flow
398 * programming request
400 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
402 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
403 notifier.flowErrorReported(node, rid, errorMsg);
408 public void tagUpdated(String containerName, Node n, short oldTag,
409 short newTag, UpdateType t) {
414 public void containerFlowUpdated(String containerName,
415 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
419 public void nodeConnectorUpdated(String containerName, NodeConnector p,
421 Set<NodeConnector> target = null;
425 if (!containerToNc.containsKey(containerName)) {
426 containerToNc.put(containerName, new HashSet<NodeConnector>());
428 containerToNc.get(containerName).add(p);
433 target = containerToNc.get(containerName);
434 if (target != null) {
443 public void containerModeUpdated(UpdateType t) {
448 public Status syncSendBarrierMessage(Node node) {
449 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
450 return new Status(StatusCode.NOTACCEPTABLE,
451 "The node does not support Barrier message.");
454 if (controller != null) {
455 long swid = (Long) node.getID();
456 ISwitch sw = controller.getSwitch(swid);
458 sw.syncSendBarrierMessage();
460 return (new Status(StatusCode.SUCCESS));
462 return new Status(StatusCode.GONE,
463 "The node does not have a valid Switch reference.");
466 return new Status(StatusCode.INTERNALERROR,
467 "Failed to send Barrier message.");
471 public Status asyncSendBarrierMessage(Node node) {
472 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
473 return new Status(StatusCode.NOTACCEPTABLE,
474 "The node does not support Barrier message.");
477 if (controller != null) {
478 long swid = (Long) node.getID();
479 ISwitch sw = controller.getSwitch(swid);
481 sw.asyncSendBarrierMessage();
483 return (new Status(StatusCode.SUCCESS));
485 return new Status(StatusCode.GONE,
486 "The node does not have a valid Switch reference.");
489 return new Status(StatusCode.INTERNALERROR,
490 "Failed to send Barrier message.");
494 * This method sends the message asynchronously until the number of messages
495 * sent reaches a threshold. Then a Barrier message is sent automatically
496 * for sync purpose. An unique Request ID associated with the message is
497 * passed down by the caller. The Request ID will be returned to the caller
498 * when an error message is received from the switch.
505 * The OF message to be sent
510 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
511 Object result = Boolean.TRUE;
512 long swid = (Long) node.getID();
515 xid = sw.asyncSend(msg);
516 addXid2Rid(swid, xid, rid);
518 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
519 if (swxid2rid == null) {
523 int size = swxid2rid.size();
524 if (size % barrierMessagePriorCount == 0) {
525 result = asyncSendBarrierMessage(node);
532 * A number of async messages are sent followed by a synchronous Barrier
533 * message. This method returns the maximum async messages that can be sent
534 * before the Barrier message.
536 * @return The max count of async messages sent prior to Barrier message
538 private int getBarrierMessagePriorCount() {
539 String count = System.getProperty("of.barrierMessagePriorCount");
544 rv = Integer.parseInt(count);
545 } catch (Exception e) {
553 * This method returns the message Request ID previously assigned by the
554 * caller for a given OF message xid
560 * @return The Request ID
562 private Long getMessageRid(long swid, Integer xid) {
569 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
570 if (swxid2rid != null) {
571 rid = swxid2rid.get(xid);
577 * This method returns a copy of outstanding xid to rid mappings.for a given
582 * @return a copy of xid2rid mappings
584 public Map<Integer, Long> getSwXid2Rid(long swid) {
585 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
587 if (swxid2rid != null) {
588 return new HashMap<Integer, Long>(swxid2rid);
590 return new HashMap<Integer, Long>();
595 * Adds xid to rid mapping to the local DB
602 * The message Request ID
604 private void addXid2Rid(long swid, int xid, long rid) {
605 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
606 if (swxid2rid != null) {
607 swxid2rid.put(xid, rid);
612 * When an Error message is received, this method will be invoked to remove
613 * the offending xid from the local DB.
620 private void removeXid2Rid(long swid, int xid) {
621 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
622 if (swxid2rid != null) {
623 swxid2rid.remove(xid);
628 * Convert various result into Status
631 * The returned result from previous action
633 * add/modify/delete flow action
635 * The Request ID associated with the flow message
638 private Status getStatusInternal(Object result, String action, long rid) {
639 if (result instanceof Boolean) {
640 return ((Boolean) result == Boolean.TRUE) ? new Status(
641 StatusCode.SUCCESS, rid) : new Status(
642 StatusCode.TIMEOUT, errorString(null, action,
643 "Request Timed Out"));
644 } else if (result instanceof Status) {
645 return (Status) result;
646 } else if (result instanceof OFError) {
647 OFError res = (OFError) result;
648 return new Status(StatusCode.INTERNALERROR, errorString(
649 "program", action, Utils.getOFErrorString(res)));
651 return new Status(StatusCode.INTERNALERROR, errorString(
652 "send", action, "Internal Error"));
657 * When a Barrier reply is received, this method will be invoked to clear
663 private void clearXid2Rid(long swid) {
664 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
665 if (swxid2rid != null) {
671 public void updateNode(Node node, UpdateType type, Set<Property> props) {
672 long swid = (Long)node.getID();
676 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
677 this.xid2rid.put(swid, swxid2rid);
682 this.xid2rid.remove(swid);
689 public void updateNodeConnector(NodeConnector nodeConnector,
690 UpdateType type, Set<Property> props) {
693 private void registerWithOSGIConsole() {
694 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
696 bundleContext.registerService(CommandProvider.class.getName(), this,
701 public String getHelp() {
702 StringBuffer help = new StringBuffer();
703 help.append("-- Flow Programmer Service --\n");
704 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
705 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
706 return help.toString();
709 public void _px2r(CommandInterpreter ci) {
710 String st = ci.nextArgument();
712 ci.println("Please enter a valid node id");
718 sid = HexEncode.stringToLong(st);
719 } catch (NumberFormatException e) {
720 ci.println("Please enter a valid node id");
724 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
725 if (swxid2rid == null) {
726 ci.println("The node id entered does not exist");
730 ci.println("xid rid");
732 Set<Integer> xidSet = swxid2rid.keySet();
733 if (xidSet == null) {
737 for (Integer xid : xidSet) {
738 ci.println(xid + " " + swxid2rid.get(xid));
742 public void _px2rc(CommandInterpreter ci) {
743 ci.println("Max num of async messages sent prior to the Barrier message is "
744 + barrierMessagePriorCount);