2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.HashSet;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
20 import org.eclipse.osgi.framework.console.CommandInterpreter;
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
23 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
27 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Error;
28 import org.openflow.protocol.OFError;
29 import org.openflow.protocol.OFFlowMod;
30 import org.openflow.protocol.OFFlowRemoved;
31 import org.openflow.protocol.OFMessage;
32 import org.openflow.protocol.OFPort;
33 import org.openflow.protocol.OFType;
34 import org.openflow.protocol.action.OFAction;
36 import org.opendaylight.controller.sal.core.ContainerFlow;
37 import org.opendaylight.controller.sal.core.IContainerListener;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.Node.NodeIDType;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
45 import org.opendaylight.controller.sal.match.Match;
46 import org.opendaylight.controller.sal.match.MatchType;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.opendaylight.controller.sal.utils.HexEncode;
49 import org.opendaylight.controller.sal.utils.NodeCreator;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.controller.sal.utils.Status;
52 import org.osgi.framework.BundleContext;
53 import org.osgi.framework.FrameworkUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * Represents the openflow plugin component in charge of programming the flows
59 * the flow programming and relay them to functional modules above SAL.
61 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
62 IMessageListener, IContainerListener, IInventoryShimExternalListener,
64 private static final Logger log = LoggerFactory
65 .getLogger(FlowProgrammerService.class);
66 private IController controller;
67 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
68 private Map<String, Set<NodeConnector>> containerToNc;
69 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
70 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
72 public FlowProgrammerService() {
74 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
75 containerToNc = new HashMap<String, Set<NodeConnector>>();
76 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
79 public void setController(IController core) {
80 this.controller = core;
83 public void unsetController(IController core) {
84 if (this.controller == core) {
85 this.controller = null;
89 public void setFlowProgrammerNotifier(Map<String, ?> props,
90 IFlowProgrammerNotifier s) {
91 if (props == null || props.get("containerName") == null) {
92 log.error("Didn't receive the service correct properties");
95 String containerName = (String) props.get("containerName");
96 this.flowProgrammerNotifiers.put(containerName, s);
99 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
100 IFlowProgrammerNotifier s) {
101 if (props == null || props.get("containerName") == null) {
102 log.error("Didn't receive the service correct properties");
105 String containerName = (String) props.get("containerName");
106 if (this.flowProgrammerNotifiers != null
107 && this.flowProgrammerNotifiers.containsKey(containerName)
108 && this.flowProgrammerNotifiers.get(containerName) == s) {
109 this.flowProgrammerNotifiers.remove(containerName);
114 * Function called by the dependency manager when all the required
115 * dependencies are satisfied
119 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
120 this.controller.addMessageListener(OFType.ERROR, this);
121 registerWithOSGIConsole();
125 * Function called by the dependency manager when at least one dependency
126 * become unsatisfied or when the component is shutting down because for
127 * example bundle is being stopped.
134 * Function called by dependency manager after "init ()" is called and after
135 * the services provided by the class are registered in the service registry
142 * Function called by the dependency manager before the services exported by
143 * the component are unregistered, this will be followed by a "destroy ()"
151 public Status addFlow(Node node, Flow flow) {
152 return addFlowInternal(node, flow, 0);
156 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
157 return modifyFlowInternal(node, oldFlow, newFlow, 0);
161 public Status removeFlow(Node node, Flow flow) {
162 return removeFlowInternal(node, flow, 0);
166 public Status addFlowAsync(Node node, Flow flow, long rid) {
167 return addFlowInternal(node, flow, rid);
171 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
173 return modifyFlowInternal(node, oldFlow, newFlow, rid);
177 public Status removeFlowAsync(Node node, Flow flow, long rid) {
178 return removeFlowInternal(node, flow, rid);
181 private Status addFlowInternal(Node node, Flow flow, long rid) {
182 String action = "add";
183 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
184 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
185 action, "Invalid node type"));
188 if (controller != null) {
189 ISwitch sw = controller.getSwitch((Long) node.getID());
191 FlowConverter x = new FlowConverter(flow);
192 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
197 * Synchronous message send. Each message is followed by a
200 result = sw.syncSend(msg);
203 * Message will be sent asynchronously. A Barrier message
204 * will be inserted automatically to synchronize the
207 result = asyncMsgSend(node, sw, msg, rid);
209 if (result instanceof Boolean) {
210 return ((Boolean) result == Boolean.TRUE) ? new Status(
211 StatusCode.SUCCESS, null) : new Status(
212 StatusCode.TIMEOUT, errorString(null, action,
213 "Request Timed Out"));
214 } else if (result instanceof OFError) {
215 OFError res = (OFError) result;
216 if (res.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
217 V6Error er = new V6Error(res);
218 byte[] b = res.getError();
219 ByteBuffer bb = ByteBuffer.allocate(b.length);
223 return new Status(StatusCode.INTERNALERROR,
224 errorString("program", action,
225 "Vendor Extension Internal Error"));
227 return new Status(StatusCode.INTERNALERROR, errorString(
228 "program", action, Utils.getOFErrorString(res)));
230 return new Status(StatusCode.INTERNALERROR, errorString(
231 "send", action, "Internal Error"));
234 return new Status(StatusCode.GONE, errorString("send", action,
235 "Switch is not available"));
238 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
239 "Internal plugin error"));
242 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
243 String action = "modify";
244 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
245 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
246 action, "Invalid node type"));
248 if (controller != null) {
249 ISwitch sw = controller.getSwitch((Long) node.getID());
251 OFMessage msg1 = null, msg2 = null;
253 // If priority and match portion are the same, send a
254 // modification message
255 if (oldFlow.getPriority() != newFlow.getPriority()
256 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
257 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
258 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
259 msg2 = new FlowConverter(newFlow).getOFFlowMod(
260 OFFlowMod.OFPFC_ADD, null);
262 msg1 = new FlowConverter(newFlow).getOFFlowMod(
263 OFFlowMod.OFPFC_MODIFY_STRICT, null);
266 * Synchronous message send
268 action = (msg2 == null) ? "modify" : "delete";
272 * Synchronous message send. Each message is followed by a
275 result = sw.syncSend(msg1);
278 * Message will be sent asynchronously. A Barrier message
279 * will be inserted automatically to synchronize the
282 result = asyncMsgSend(node, sw, msg1, rid);
284 if (result instanceof Boolean) {
285 if ((Boolean) result == Boolean.FALSE) {
286 return new Status(StatusCode.TIMEOUT, errorString(null,
287 action, "Request Timed Out"));
288 } else if (msg2 == null) {
289 return new Status(StatusCode.SUCCESS, null);
291 } else if (result instanceof OFError) {
292 return new Status(StatusCode.INTERNALERROR, errorString(
294 Utils.getOFErrorString((OFError) result)));
296 return new Status(StatusCode.INTERNALERROR, errorString(
297 "send", action, "Internal Error"));
304 * Synchronous message send. Each message is followed by a
307 result = sw.syncSend(msg2);
310 * Message will be sent asynchronously. A Barrier message
311 * will be inserted automatically to synchronize the
314 result = asyncMsgSend(node, sw, msg2, rid);
316 if (result instanceof Boolean) {
317 return ((Boolean) result == Boolean.TRUE) ? new Status(
318 StatusCode.SUCCESS, null) : new Status(
319 StatusCode.TIMEOUT, errorString(null, action,
320 "Request Timed Out"));
321 } else if (result instanceof OFError) {
322 return new Status(StatusCode.INTERNALERROR,
323 errorString("program", action, Utils
324 .getOFErrorString((OFError) result)));
326 return new Status(StatusCode.INTERNALERROR,
327 errorString("send", action, "Internal Error"));
331 return new Status(StatusCode.GONE, errorString("send", action,
332 "Switch is not available"));
335 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
336 "Internal plugin error"));
339 private Status removeFlowInternal(Node node, Flow flow, long rid) {
340 String action = "remove";
341 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
342 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
343 action, "Invalid node type"));
345 if (controller != null) {
346 ISwitch sw = controller.getSwitch((Long) node.getID());
348 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
349 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
353 * Synchronous message send. Each message is followed by a
356 result = sw.syncSend(msg);
359 * Message will be sent asynchronously. A Barrier message
360 * will be inserted automatically to synchronize the
363 result = asyncMsgSend(node, sw, msg, rid);
365 if (result instanceof Boolean) {
366 return ((Boolean) result == Boolean.TRUE) ? new Status(
367 StatusCode.SUCCESS, null) : new Status(
368 StatusCode.TIMEOUT, errorString(null, action,
369 "Request Timed Out"));
370 } else if (result instanceof OFError) {
371 return new Status(StatusCode.INTERNALERROR, errorString(
373 Utils.getOFErrorString((OFError) result)));
375 return new Status(StatusCode.INTERNALERROR, errorString(
376 "send", action, "Internal Error"));
379 return new Status(StatusCode.GONE, errorString("send", action,
380 "Switch is not available"));
383 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
384 "Internal plugin error"));
388 public Status removeAllFlows(Node node) {
389 return new Status(StatusCode.SUCCESS, null);
392 private String errorString(String phase, String action, String cause) {
394 + ((phase != null) ? phase + " the " + action
395 + " flow message: " : action + " the flow: ") + cause;
399 public void receive(ISwitch sw, OFMessage msg) {
400 if (msg instanceof OFFlowRemoved) {
401 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
402 } else if (msg instanceof OFError) {
403 handleErrorMessage(sw, (OFError) msg);
407 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
408 Node node = NodeCreator.createOFNode(sw.getId());
409 Flow flow = new FlowConverter(msg.getMatch(),
410 new ArrayList<OFAction>(0)).getFlow(node);
411 flow.setPriority(msg.getPriority());
412 flow.setIdleTimeout(msg.getIdleTimeout());
413 flow.setId(msg.getCookie());
415 Match match = flow.getMatch();
416 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
417 .getField(MatchType.IN_PORT).getValue() : null;
419 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
421 String container = containerNotifier.getKey();
422 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
424 * Switch only provide us with the match information. For now let's
425 * try to identify the container membership only from the input port
426 * match field. In any case, upper layer consumers can derive
427 * whether the notification was not for them. More sophisticated
428 * filtering can be added later on.
431 || container.equals(GlobalConstants.DEFAULT.toString())
432 || this.containerToNc.get(container).contains(inPort)) {
433 notifier.flowRemoved(node, flow);
438 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
439 Node node = NodeCreator.createOFNode(sw.getId());
440 OFMessage offendingMsg = errorMsg.getOffendingMsg();
442 if (offendingMsg != null) {
443 xid = offendingMsg.getXid();
445 xid = errorMsg.getXid();
448 Long rid = getMessageRid(sw.getId(), xid);
450 * Null or zero requestId indicates that the error message is meant for
451 * a sync message. It will be handled by the sync message worker thread.
452 * Hence we are done here.
454 if ((rid == null) || (rid == 0)) {
459 * Notifies the caller that error has been reported for a previous flow
460 * programming request
462 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
464 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
465 notifier.flowErrorReported(node, rid, errorMsg);
470 public void tagUpdated(String containerName, Node n, short oldTag,
471 short newTag, UpdateType t) {
476 public void containerFlowUpdated(String containerName,
477 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
481 public void nodeConnectorUpdated(String containerName, NodeConnector p,
483 Set<NodeConnector> target = null;
487 if (!containerToNc.containsKey(containerName)) {
488 containerToNc.put(containerName, new HashSet<NodeConnector>());
490 containerToNc.get(containerName).add(p);
495 target = containerToNc.get(containerName);
496 if (target != null) {
505 public void containerModeUpdated(UpdateType t) {
510 public Status sendBarrierMessage(Node node) {
511 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
512 return new Status(StatusCode.NOTACCEPTABLE,
513 "The node does not support Barrier message.");
516 if (controller != null) {
517 long swid = (Long) node.getID();
518 ISwitch sw = controller.getSwitch(swid);
520 sw.sendBarrierMessage();
522 return (new Status(StatusCode.SUCCESS, null));
524 return new Status(StatusCode.GONE,
525 "The node does not have a valid Switch reference.");
528 return new Status(StatusCode.INTERNALERROR,
529 "Failed to send Barrier message.");
533 * This method sends the message asynchronously until the number of messages
534 * sent reaches a threshold. Then a Barrier message is sent automatically
535 * for sync purpose. An unique Request ID associated with the message is
536 * passed down by the caller. The Request ID will be returned to the caller
537 * when an error message is received from the switch.
544 * The OF message to be sent
549 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
550 Object result = Boolean.TRUE;
551 long swid = (Long) node.getID();
554 xid = sw.asyncSend(msg);
555 addXid2Rid(swid, xid, rid);
557 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
558 if (swxid2rid == null) {
562 int size = swxid2rid.size();
563 if (size % barrierMessagePriorCount == 0) {
564 result = sendBarrierMessage(node);
571 * A number of async messages are sent followed by a synchronous Barrier
572 * message. This method returns the maximum async messages that can be sent
573 * before the Barrier message.
575 * @return The max count of async messages sent prior to Barrier message
577 private int getBarrierMessagePriorCount() {
578 String count = System.getProperty("of.barrierMessagePriorCount");
583 rv = Integer.parseInt(count);
584 } catch (Exception e) {
592 * This method returns the message Request ID previously assigned by the
593 * caller for a given OF message xid
599 * @return The Request ID
601 private Long getMessageRid(long swid, Integer xid) {
608 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
609 if (swxid2rid != null) {
610 rid = swxid2rid.get(xid);
616 * This method returns a copy of outstanding xid to rid mappings.for a given
621 * @return a copy of xid2rid mappings
623 public Map<Integer, Long> getSwXid2Rid(long swid) {
624 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
626 if (swxid2rid != null) {
627 return new HashMap<Integer, Long>(swxid2rid);
629 return new HashMap<Integer, Long>();
634 * Adds xid to rid mapping to the local DB
641 * The message Request ID
643 private void addXid2Rid(long swid, int xid, long rid) {
644 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
645 if (swxid2rid != null) {
646 swxid2rid.put(xid, rid);
651 * When an Error message is received, this method will be invoked to remove
652 * the offending xid from the local DB.
659 private void removeXid2Rid(long swid, int xid) {
660 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
661 if (swxid2rid != null) {
662 swxid2rid.remove(xid);
667 * When a Barrier reply is received, this method will be invoked to clear
673 private void clearXid2Rid(long swid) {
674 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
675 if (swxid2rid != null) {
681 public void updateNode(Node node, UpdateType type, Set<Property> props) {
682 long swid = (Long)node.getID();
686 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
687 this.xid2rid.put(swid, swxid2rid);
692 this.xid2rid.remove(swid);
699 public void updateNodeConnector(NodeConnector nodeConnector,
700 UpdateType type, Set<Property> props) {
703 private void registerWithOSGIConsole() {
704 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
706 bundleContext.registerService(CommandProvider.class.getName(), this,
711 public String getHelp() {
712 StringBuffer help = new StringBuffer();
713 help.append("-- Flow Programmer Service --\n");
714 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
715 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
716 return help.toString();
719 public void _px2r(CommandInterpreter ci) {
720 String st = ci.nextArgument();
722 ci.println("Please enter a valid node id");
728 sid = HexEncode.stringToLong(st);
729 } catch (NumberFormatException e) {
730 ci.println("Please enter a valid node id");
734 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
735 if (swxid2rid == null) {
736 ci.println("The node id entered does not exist");
740 ci.println("xid rid");
742 Set<Integer> xidSet = swxid2rid.keySet();
743 if (xidSet == null) {
747 for (Integer xid : xidSet) {
748 ci.println(xid + " " + swxid2rid.get(xid));
752 public void _px2rc(CommandInterpreter ci) {
753 ci.println("Max num of async messages sent prior to the Barrier message is "
754 + barrierMessagePriorCount);