2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
19 import org.eclipse.osgi.framework.console.CommandInterpreter;
20 import org.eclipse.osgi.framework.console.CommandProvider;
21 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
22 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
23 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.sal.core.ContainerFlow;
27 import org.opendaylight.controller.sal.core.IContainerListener;
28 import org.opendaylight.controller.sal.core.Node;
29 import org.opendaylight.controller.sal.core.Node.NodeIDType;
30 import org.opendaylight.controller.sal.core.NodeConnector;
31 import org.opendaylight.controller.sal.core.Property;
32 import org.opendaylight.controller.sal.core.UpdateType;
33 import org.opendaylight.controller.sal.flowprogrammer.Flow;
34 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
35 import org.opendaylight.controller.sal.match.Match;
36 import org.opendaylight.controller.sal.match.MatchType;
37 import org.opendaylight.controller.sal.utils.GlobalConstants;
38 import org.opendaylight.controller.sal.utils.HexEncode;
39 import org.opendaylight.controller.sal.utils.NodeCreator;
40 import org.opendaylight.controller.sal.utils.Status;
41 import org.opendaylight.controller.sal.utils.StatusCode;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFFlowRemoved;
45 import org.openflow.protocol.OFMessage;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFType;
48 import org.openflow.protocol.action.OFAction;
49 import org.osgi.framework.BundleContext;
50 import org.osgi.framework.FrameworkUtil;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Represents the openflow plugin component in charge of programming the flows
56 * the flow programming and relay them to functional modules above SAL.
58 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
59 IMessageListener, IContainerListener, IInventoryShimExternalListener,
61 private static final Logger log = LoggerFactory
62 .getLogger(FlowProgrammerService.class);
63 private IController controller;
64 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
65 private Map<String, Set<NodeConnector>> containerToNc;
66 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
67 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
69 public FlowProgrammerService() {
71 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
72 containerToNc = new HashMap<String, Set<NodeConnector>>();
73 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
76 public void setController(IController core) {
77 this.controller = core;
80 public void unsetController(IController core) {
81 if (this.controller == core) {
82 this.controller = null;
86 public void setFlowProgrammerNotifier(Map<String, ?> props,
87 IFlowProgrammerNotifier s) {
88 if (props == null || props.get("containerName") == null) {
89 log.error("Didn't receive the service correct properties");
92 String containerName = (String) props.get("containerName");
93 this.flowProgrammerNotifiers.put(containerName, s);
96 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
97 IFlowProgrammerNotifier s) {
98 if (props == null || props.get("containerName") == null) {
99 log.error("Didn't receive the service correct properties");
102 String containerName = (String) props.get("containerName");
103 if (this.flowProgrammerNotifiers != null
104 && this.flowProgrammerNotifiers.containsKey(containerName)
105 && this.flowProgrammerNotifiers.get(containerName) == s) {
106 this.flowProgrammerNotifiers.remove(containerName);
111 * Function called by the dependency manager when all the required
112 * dependencies are satisfied
116 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
117 this.controller.addMessageListener(OFType.ERROR, this);
118 registerWithOSGIConsole();
122 * Function called by the dependency manager when at least one dependency
123 * become unsatisfied or when the component is shutting down because for
124 * example bundle is being stopped.
131 * Function called by dependency manager after "init ()" is called and after
132 * the services provided by the class are registered in the service registry
139 * Function called by the dependency manager before the services exported by
140 * the component are unregistered, this will be followed by a "destroy ()"
148 public Status addFlow(Node node, Flow flow) {
149 return addFlowInternal(node, flow, 0);
153 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
154 return modifyFlowInternal(node, oldFlow, newFlow, 0);
158 public Status removeFlow(Node node, Flow flow) {
159 return removeFlowInternal(node, flow, 0);
163 public Status addFlowAsync(Node node, Flow flow, long rid) {
164 return addFlowInternal(node, flow, rid);
168 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
170 return modifyFlowInternal(node, oldFlow, newFlow, rid);
174 public Status removeFlowAsync(Node node, Flow flow, long rid) {
175 return removeFlowInternal(node, flow, rid);
178 private Status addFlowInternal(Node node, Flow flow, long rid) {
179 String action = "add";
180 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
181 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
182 action, "Invalid node type"));
185 if (controller != null) {
186 ISwitch sw = controller.getSwitch((Long) node.getID());
188 FlowConverter x = new FlowConverter(flow);
189 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
194 * Synchronous message send. Each message is followed by a
197 result = sw.syncSend(msg);
200 * Message will be sent asynchronously. A Barrier message
201 * will be inserted automatically to synchronize the
204 result = asyncMsgSend(node, sw, msg, rid);
206 if (result instanceof Boolean) {
207 return ((Boolean) result == Boolean.TRUE) ? new Status(
208 StatusCode.SUCCESS, rid) : new Status(
209 StatusCode.TIMEOUT, errorString(null, action,
210 "Request Timed Out"));
211 } else if (result instanceof OFError) {
212 OFError res = (OFError) result;
213 return new Status(StatusCode.INTERNALERROR, errorString(
214 "program", action, Utils.getOFErrorString(res)));
216 return new Status(StatusCode.INTERNALERROR, errorString(
217 "send", action, "Internal Error"));
220 return new Status(StatusCode.GONE, errorString("send", action,
221 "Switch is not available"));
224 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
225 "Internal plugin error"));
228 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
229 String action = "modify";
230 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
231 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
232 action, "Invalid node type"));
234 if (controller != null) {
235 ISwitch sw = controller.getSwitch((Long) node.getID());
237 OFMessage msg1 = null, msg2 = null;
239 // If priority and match portion are the same, send a
240 // modification message
241 if (oldFlow.getPriority() != newFlow.getPriority()
242 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
243 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
244 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
245 msg2 = new FlowConverter(newFlow).getOFFlowMod(
246 OFFlowMod.OFPFC_ADD, null);
248 msg1 = new FlowConverter(newFlow).getOFFlowMod(
249 OFFlowMod.OFPFC_MODIFY_STRICT, null);
252 * Synchronous message send
254 action = (msg2 == null) ? "modify" : "delete";
258 * Synchronous message send. Each message is followed by a
261 result = sw.syncSend(msg1);
264 * Message will be sent asynchronously. A Barrier message
265 * will be inserted automatically to synchronize the
268 result = asyncMsgSend(node, sw, msg1, rid);
270 if (result instanceof Boolean) {
271 if ((Boolean) result == Boolean.FALSE) {
272 return new Status(StatusCode.TIMEOUT, errorString(null,
273 action, "Request Timed Out"));
274 } else if (msg2 == null) {
275 return new Status(StatusCode.SUCCESS, rid);
277 } else if (result instanceof OFError) {
278 return new Status(StatusCode.INTERNALERROR, errorString(
280 Utils.getOFErrorString((OFError) result)));
282 return new Status(StatusCode.INTERNALERROR, errorString(
283 "send", action, "Internal Error"));
290 * Synchronous message send. Each message is followed by a
293 result = sw.syncSend(msg2);
296 * Message will be sent asynchronously. A Barrier message
297 * will be inserted automatically to synchronize the
300 result = asyncMsgSend(node, sw, msg2, rid);
302 if (result instanceof Boolean) {
303 return ((Boolean) result == Boolean.TRUE) ? new Status(
304 StatusCode.SUCCESS, rid) : new Status(
305 StatusCode.TIMEOUT, errorString(null, action,
306 "Request Timed Out"));
307 } else if (result instanceof OFError) {
308 return new Status(StatusCode.INTERNALERROR,
309 errorString("program", action, Utils
310 .getOFErrorString((OFError) result)));
312 return new Status(StatusCode.INTERNALERROR,
313 errorString("send", action, "Internal Error"));
317 return new Status(StatusCode.GONE, errorString("send", action,
318 "Switch is not available"));
321 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
322 "Internal plugin error"));
325 private Status removeFlowInternal(Node node, Flow flow, long rid) {
326 String action = "remove";
327 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
328 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
329 action, "Invalid node type"));
331 if (controller != null) {
332 ISwitch sw = controller.getSwitch((Long) node.getID());
334 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
335 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
339 * Synchronous message send. Each message is followed by a
342 result = sw.syncSend(msg);
345 * Message will be sent asynchronously. A Barrier message
346 * will be inserted automatically to synchronize the
349 result = asyncMsgSend(node, sw, msg, rid);
351 if (result instanceof Boolean) {
352 return ((Boolean) result == Boolean.TRUE) ? new Status(
353 StatusCode.SUCCESS, rid) : new Status(
354 StatusCode.TIMEOUT, errorString(null, action,
355 "Request Timed Out"));
356 } else if (result instanceof OFError) {
357 return new Status(StatusCode.INTERNALERROR, errorString(
359 Utils.getOFErrorString((OFError) result)));
361 return new Status(StatusCode.INTERNALERROR, errorString(
362 "send", action, "Internal Error"));
365 return new Status(StatusCode.GONE, errorString("send", action,
366 "Switch is not available"));
369 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
370 "Internal plugin error"));
374 public Status removeAllFlows(Node node) {
375 return new Status(StatusCode.SUCCESS, null);
378 private String errorString(String phase, String action, String cause) {
380 + ((phase != null) ? phase + " the " + action
381 + " flow message: " : action + " the flow: ") + cause;
385 public void receive(ISwitch sw, OFMessage msg) {
386 if (msg instanceof OFFlowRemoved) {
387 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
388 } else if (msg instanceof OFError) {
389 handleErrorMessage(sw, (OFError) msg);
393 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
394 Node node = NodeCreator.createOFNode(sw.getId());
395 Flow flow = new FlowConverter(msg.getMatch(),
396 new ArrayList<OFAction>(0)).getFlow(node);
397 flow.setPriority(msg.getPriority());
398 flow.setIdleTimeout(msg.getIdleTimeout());
399 flow.setId(msg.getCookie());
401 Match match = flow.getMatch();
402 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
403 .getField(MatchType.IN_PORT).getValue() : null;
405 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
407 String container = containerNotifier.getKey();
408 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
410 * Switch only provide us with the match information. For now let's
411 * try to identify the container membership only from the input port
412 * match field. In any case, upper layer consumers can derive
413 * whether the notification was not for them. More sophisticated
414 * filtering can be added later on.
417 || container.equals(GlobalConstants.DEFAULT.toString())
418 || this.containerToNc.get(container).contains(inPort)) {
419 notifier.flowRemoved(node, flow);
424 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
425 Node node = NodeCreator.createOFNode(sw.getId());
426 OFMessage offendingMsg = errorMsg.getOffendingMsg();
428 if (offendingMsg != null) {
429 xid = offendingMsg.getXid();
431 xid = errorMsg.getXid();
434 Long rid = getMessageRid(sw.getId(), xid);
436 * Null or zero requestId indicates that the error message is meant for
437 * a sync message. It will be handled by the sync message worker thread.
438 * Hence we are done here.
440 if ((rid == null) || (rid == 0)) {
445 * Notifies the caller that error has been reported for a previous flow
446 * programming request
448 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
450 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
451 notifier.flowErrorReported(node, rid, errorMsg);
456 public void tagUpdated(String containerName, Node n, short oldTag,
457 short newTag, UpdateType t) {
462 public void containerFlowUpdated(String containerName,
463 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
467 public void nodeConnectorUpdated(String containerName, NodeConnector p,
469 Set<NodeConnector> target = null;
473 if (!containerToNc.containsKey(containerName)) {
474 containerToNc.put(containerName, new HashSet<NodeConnector>());
476 containerToNc.get(containerName).add(p);
481 target = containerToNc.get(containerName);
482 if (target != null) {
491 public void containerModeUpdated(UpdateType t) {
496 public Status sendBarrierMessage(Node node) {
497 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
498 return new Status(StatusCode.NOTACCEPTABLE,
499 "The node does not support Barrier message.");
502 if (controller != null) {
503 long swid = (Long) node.getID();
504 ISwitch sw = controller.getSwitch(swid);
506 sw.sendBarrierMessage();
508 return (new Status(StatusCode.SUCCESS, null));
510 return new Status(StatusCode.GONE,
511 "The node does not have a valid Switch reference.");
514 return new Status(StatusCode.INTERNALERROR,
515 "Failed to send Barrier message.");
519 * This method sends the message asynchronously until the number of messages
520 * sent reaches a threshold. Then a Barrier message is sent automatically
521 * for sync purpose. An unique Request ID associated with the message is
522 * passed down by the caller. The Request ID will be returned to the caller
523 * when an error message is received from the switch.
530 * The OF message to be sent
535 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
536 Object result = Boolean.TRUE;
537 long swid = (Long) node.getID();
540 xid = sw.asyncSend(msg);
541 addXid2Rid(swid, xid, rid);
543 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
544 if (swxid2rid == null) {
548 int size = swxid2rid.size();
549 if (size % barrierMessagePriorCount == 0) {
550 result = sendBarrierMessage(node);
557 * A number of async messages are sent followed by a synchronous Barrier
558 * message. This method returns the maximum async messages that can be sent
559 * before the Barrier message.
561 * @return The max count of async messages sent prior to Barrier message
563 private int getBarrierMessagePriorCount() {
564 String count = System.getProperty("of.barrierMessagePriorCount");
569 rv = Integer.parseInt(count);
570 } catch (Exception e) {
578 * This method returns the message Request ID previously assigned by the
579 * caller for a given OF message xid
585 * @return The Request ID
587 private Long getMessageRid(long swid, Integer xid) {
594 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
595 if (swxid2rid != null) {
596 rid = swxid2rid.get(xid);
602 * This method returns a copy of outstanding xid to rid mappings.for a given
607 * @return a copy of xid2rid mappings
609 public Map<Integer, Long> getSwXid2Rid(long swid) {
610 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
612 if (swxid2rid != null) {
613 return new HashMap<Integer, Long>(swxid2rid);
615 return new HashMap<Integer, Long>();
620 * Adds xid to rid mapping to the local DB
627 * The message Request ID
629 private void addXid2Rid(long swid, int xid, long rid) {
630 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
631 if (swxid2rid != null) {
632 swxid2rid.put(xid, rid);
637 * When an Error message is received, this method will be invoked to remove
638 * the offending xid from the local DB.
645 private void removeXid2Rid(long swid, int xid) {
646 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
647 if (swxid2rid != null) {
648 swxid2rid.remove(xid);
653 * When a Barrier reply is received, this method will be invoked to clear
659 private void clearXid2Rid(long swid) {
660 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
661 if (swxid2rid != null) {
667 public void updateNode(Node node, UpdateType type, Set<Property> props) {
668 long swid = (Long)node.getID();
672 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
673 this.xid2rid.put(swid, swxid2rid);
678 this.xid2rid.remove(swid);
685 public void updateNodeConnector(NodeConnector nodeConnector,
686 UpdateType type, Set<Property> props) {
689 private void registerWithOSGIConsole() {
690 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
692 bundleContext.registerService(CommandProvider.class.getName(), this,
697 public String getHelp() {
698 StringBuffer help = new StringBuffer();
699 help.append("-- Flow Programmer Service --\n");
700 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
701 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
702 return help.toString();
705 public void _px2r(CommandInterpreter ci) {
706 String st = ci.nextArgument();
708 ci.println("Please enter a valid node id");
714 sid = HexEncode.stringToLong(st);
715 } catch (NumberFormatException e) {
716 ci.println("Please enter a valid node id");
720 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
721 if (swxid2rid == null) {
722 ci.println("The node id entered does not exist");
726 ci.println("xid rid");
728 Set<Integer> xidSet = swxid2rid.keySet();
729 if (xidSet == null) {
733 for (Integer xid : xidSet) {
734 ci.println(xid + " " + swxid2rid.get(xid));
738 public void _px2rc(CommandInterpreter ci) {
739 ci.println("Max num of async messages sent prior to the Barrier message is "
740 + barrierMessagePriorCount);