2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.internal;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.Future;
20 import org.eclipse.osgi.framework.console.CommandInterpreter;
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
23 import org.opendaylight.controller.sal.core.ContainerFlow;
24 import org.opendaylight.controller.sal.core.IContainerListener;
25 import org.opendaylight.controller.sal.core.Node;
26 import org.opendaylight.controller.sal.core.Node.NodeIDType;
27 import org.opendaylight.controller.sal.core.NodeConnector;
28 import org.opendaylight.controller.sal.core.Property;
29 import org.opendaylight.controller.sal.core.UpdateType;
30 import org.opendaylight.controller.sal.flowprogrammer.Flow;
31 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
32 import org.opendaylight.controller.sal.match.Match;
33 import org.opendaylight.controller.sal.match.MatchType;
34 import org.opendaylight.controller.sal.utils.GlobalConstants;
35 import org.opendaylight.controller.sal.utils.HexEncode;
36 import org.opendaylight.controller.sal.utils.NodeCreator;
37 import org.opendaylight.controller.sal.utils.Status;
38 import org.opendaylight.controller.sal.utils.StatusCode;
39 import org.opendaylight.openflowplugin.openflow.IFlowProgrammerNotifier;
40 import org.opendaylight.openflowplugin.openflow.IInventoryShimExternalListener;
41 import org.opendaylight.openflowplugin.openflow.core.IController;
42 import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
43 import org.opendaylight.openflowplugin.openflow.core.ISwitch;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.openflow.protocol.OFError;
53 import org.openflow.protocol.OFFlowMod;
54 import org.openflow.protocol.OFFlowRemoved;
55 import org.openflow.protocol.OFMessage;
56 import org.openflow.protocol.OFPort;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.action.OFAction;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 * Represents the openflow plugin component in charge of programming the flows
66 * the flow programming and relay them to functional modules above SAL.
68 public class FlowProgrammerService implements IPluginInFlowProgrammerService, SalFlowService,
69 IMessageListener, IContainerListener, IInventoryShimExternalListener,
71 private static final Logger log = LoggerFactory
72 .getLogger(FlowProgrammerService.class);
73 private IController controller;
74 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
75 private Map<String, Set<NodeConnector>> containerToNc;
76 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
77 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
78 private IPluginOutConnectionService connectionOutService;
80 public FlowProgrammerService() {
82 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
83 containerToNc = new HashMap<String, Set<NodeConnector>>();
84 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
87 public void setController(IController core) {
88 this.controller = core;
91 public void unsetController(IController core) {
92 if (this.controller == core) {
93 this.controller = null;
97 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
98 connectionOutService = s;
101 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
102 if (connectionOutService == s) {
103 connectionOutService = null;
107 public void setFlowProgrammerNotifier(Map<String, ?> props,
108 IFlowProgrammerNotifier s) {
109 if (props == null || props.get("containerName") == null) {
110 log.error("Didn't receive the service correct properties");
113 String containerName = (String) props.get("containerName");
114 this.flowProgrammerNotifiers.put(containerName, s);
117 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
118 IFlowProgrammerNotifier s) {
119 if (props == null || props.get("containerName") == null) {
120 log.error("Didn't receive the service correct properties");
123 String containerName = (String) props.get("containerName");
124 if (this.flowProgrammerNotifiers != null
125 && this.flowProgrammerNotifiers.containsKey(containerName)
126 && this.flowProgrammerNotifiers.get(containerName) == s) {
127 this.flowProgrammerNotifiers.remove(containerName);
132 * Function called by the dependency manager when all the required
133 * dependencies are satisfied
137 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
138 this.controller.addMessageListener(OFType.ERROR, this);
139 registerWithOSGIConsole();
143 * Function called by the dependency manager when at least one dependency
144 * become unsatisfied or when the component is shutting down because for
145 * example bundle is being stopped.
152 * Function called by dependency manager after "init ()" is called and after
153 * the services provided by the class are registered in the service registry
160 * Function called by the dependency manager before the services exported by
161 * the component are unregistered, this will be followed by a "destroy ()"
169 public Status addFlow(Node node, Flow flow) {
170 if (!connectionOutService.isLocal(node)) {
171 log.debug("Add flow will not be processed in a non-master controller for node " + node);
172 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
175 return addFlowInternal(node, flow, 0);
179 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
180 if (!connectionOutService.isLocal(node)) {
181 log.debug("Modify flow will not be processed in a non-master controller for node " + node);
182 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
185 return modifyFlowInternal(node, oldFlow, newFlow, 0);
189 public Status removeFlow(Node node, Flow flow) {
190 if (!connectionOutService.isLocal(node)) {
191 log.debug("Remove flow will not be processed in a non-master controller for node " + node);
192 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
195 return removeFlowInternal(node, flow, 0);
199 public Status addFlowAsync(Node node, Flow flow, long rid) {
200 if (!connectionOutService.isLocal(node)) {
201 log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
202 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
205 return addFlowInternal(node, flow, rid);
209 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
211 if (!connectionOutService.isLocal(node)) {
212 log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
213 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
216 return modifyFlowInternal(node, oldFlow, newFlow, rid);
220 public Status removeFlowAsync(Node node, Flow flow, long rid) {
221 if (!connectionOutService.isLocal(node)) {
222 log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
223 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
226 return removeFlowInternal(node, flow, rid);
229 private Status addFlowInternal(Node node, Flow flow, long rid) {
230 String action = "add";
231 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
232 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
233 action, "Invalid node type"));
236 if (controller != null) {
237 ISwitch sw = controller.getSwitch((Long) node.getID());
239 FlowConverter x = new FlowConverter(flow);
240 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
245 * Synchronous message send. Each message is followed by a
248 result = sw.syncSend(msg);
251 * Message will be sent asynchronously. A Barrier message
252 * will be inserted automatically to synchronize the
255 result = asyncMsgSend(node, sw, msg, rid);
257 return getStatusInternal(result, action, rid);
259 return new Status(StatusCode.GONE, errorString("send", action,
260 "Switch is not available"));
263 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
264 "Internal plugin error"));
267 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
268 String action = "modify";
269 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
270 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
271 action, "Invalid node type"));
273 if (controller != null) {
274 ISwitch sw = controller.getSwitch((Long) node.getID());
276 OFMessage msg1 = null, msg2 = null;
278 // If priority and match portion are the same, send a
279 // modification message
280 if (oldFlow.getPriority() != newFlow.getPriority()
281 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
282 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
283 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
284 msg2 = new FlowConverter(newFlow).getOFFlowMod(
285 OFFlowMod.OFPFC_ADD, null);
287 msg1 = new FlowConverter(newFlow).getOFFlowMod(
288 OFFlowMod.OFPFC_MODIFY_STRICT, null);
291 * Synchronous message send
293 action = (msg2 == null) ? "modify" : "delete";
297 * Synchronous message send. Each message is followed by a
300 result = sw.syncSend(msg1);
303 * Message will be sent asynchronously. A Barrier message
304 * will be inserted automatically to synchronize the
307 result = asyncMsgSend(node, sw, msg1, rid);
310 Status rv = getStatusInternal(result, action, rid);
311 if ((msg2 == null) || !rv.isSuccess()) {
318 * Synchronous message send. Each message is followed by a
321 result = sw.syncSend(msg2);
324 * Message will be sent asynchronously. A Barrier message
325 * will be inserted automatically to synchronize the
328 result = asyncMsgSend(node, sw, msg2, rid);
330 return getStatusInternal(result, action, rid);
332 return new Status(StatusCode.GONE, errorString("send", action,
333 "Switch is not available"));
336 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
337 "Internal plugin error"));
340 private Status removeFlowInternal(Node node, Flow flow, long rid) {
341 String action = "remove";
342 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
343 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
344 action, "Invalid node type"));
346 if (controller != null) {
347 ISwitch sw = controller.getSwitch((Long) node.getID());
349 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
350 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
354 * Synchronous message send. Each message is followed by a
357 result = sw.syncSend(msg);
360 * Message will be sent asynchronously. A Barrier message
361 * will be inserted automatically to synchronize the
364 result = asyncMsgSend(node, sw, msg, rid);
366 return getStatusInternal(result, action, rid);
368 return new Status(StatusCode.GONE, errorString("send", action,
369 "Switch is not available"));
372 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
373 "Internal plugin error"));
377 public Status removeAllFlows(Node node) {
378 if (!connectionOutService.isLocal(node)) {
379 log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
380 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
383 return new Status(StatusCode.SUCCESS);
386 private String errorString(String phase, String action, String cause) {
388 + ((phase != null) ? phase + " the " + action
389 + " flow message: " : action + " the flow: ") + cause;
393 public void receive(ISwitch sw, OFMessage msg) {
394 if (msg instanceof OFFlowRemoved) {
395 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
396 } else if (msg instanceof OFError) {
397 handleErrorMessage(sw, (OFError) msg);
401 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
402 Node node = NodeCreator.createOFNode(sw.getId());
403 Flow flow = new FlowConverter(msg.getMatch(),
404 new ArrayList<OFAction>(0)).getFlow(node);
405 flow.setPriority(msg.getPriority());
406 flow.setIdleTimeout(msg.getIdleTimeout());
407 flow.setId(msg.getCookie());
409 Match match = flow.getMatch();
410 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
411 .getField(MatchType.IN_PORT).getValue() : null;
413 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
415 String container = containerNotifier.getKey();
416 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
418 * Switch only provide us with the match information. For now let's
419 * try to identify the container membership only from the input port
420 * match field. In any case, upper layer consumers can derive
421 * whether the notification was not for them. More sophisticated
422 * filtering can be added later on.
425 || container.equals(GlobalConstants.DEFAULT.toString())
426 || this.containerToNc.get(container).contains(inPort)) {
427 notifier.flowRemoved(node, flow);
432 private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
433 Node node = NodeCreator.createOFNode(sw.getId());
434 OFMessage offendingMsg = errorMsg.getOffendingMsg();
436 if (offendingMsg != null) {
437 xid = offendingMsg.getXid();
439 xid = errorMsg.getXid();
442 Long rid = getMessageRid(sw.getId(), xid);
444 * Null or zero requestId indicates that the error message is meant for
445 * a sync message. It will be handled by the sync message worker thread.
446 * Hence we are done here.
448 if ((rid == null) || (rid == 0)) {
453 * Notifies the caller that error has been reported for a previous flow
454 * programming request
456 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
458 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
459 notifier.flowErrorReported(node, rid, errorMsg);
464 public void tagUpdated(String containerName, Node n, short oldTag,
465 short newTag, UpdateType t) {
470 public void containerFlowUpdated(String containerName,
471 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
475 public void nodeConnectorUpdated(String containerName, NodeConnector p,
477 Set<NodeConnector> target = null;
481 if (!containerToNc.containsKey(containerName)) {
482 containerToNc.put(containerName, new HashSet<NodeConnector>());
484 containerToNc.get(containerName).add(p);
489 target = containerToNc.get(containerName);
490 if (target != null) {
499 public void containerModeUpdated(UpdateType t) {
504 public Status syncSendBarrierMessage(Node node) {
505 if (!connectionOutService.isLocal(node)) {
506 log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
507 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
510 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
511 return new Status(StatusCode.NOTACCEPTABLE,
512 "The node does not support Barrier message.");
515 if (controller != null) {
516 long swid = (Long) node.getID();
517 ISwitch sw = controller.getSwitch(swid);
519 sw.syncSendBarrierMessage();
521 return (new Status(StatusCode.SUCCESS));
523 return new Status(StatusCode.GONE,
524 "The node does not have a valid Switch reference.");
527 return new Status(StatusCode.INTERNALERROR,
528 "Failed to send Barrier message.");
532 public Status asyncSendBarrierMessage(Node node) {
533 if (!connectionOutService.isLocal(node)) {
534 log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
535 return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
538 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
539 return new Status(StatusCode.NOTACCEPTABLE,
540 "The node does not support Barrier message.");
543 if (controller != null) {
544 long swid = (Long) node.getID();
545 ISwitch sw = controller.getSwitch(swid);
547 sw.asyncSendBarrierMessage();
549 return (new Status(StatusCode.SUCCESS));
551 return new Status(StatusCode.GONE,
552 "The node does not have a valid Switch reference.");
555 return new Status(StatusCode.INTERNALERROR,
556 "Failed to send Barrier message.");
560 * This method sends the message asynchronously until the number of messages
561 * sent reaches a threshold. Then a Barrier message is sent automatically
562 * for sync purpose. An unique Request ID associated with the message is
563 * passed down by the caller. The Request ID will be returned to the caller
564 * when an error message is received from the switch.
571 * The OF message to be sent
576 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
577 Object result = Boolean.TRUE;
578 long swid = (Long) node.getID();
581 xid = sw.asyncSend(msg);
582 addXid2Rid(swid, xid, rid);
584 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
585 if (swxid2rid == null) {
589 int size = swxid2rid.size();
590 if (size % barrierMessagePriorCount == 0) {
591 result = asyncSendBarrierMessage(node);
598 * A number of async messages are sent followed by a synchronous Barrier
599 * message. This method returns the maximum async messages that can be sent
600 * before the Barrier message.
602 * @return The max count of async messages sent prior to Barrier message
604 private int getBarrierMessagePriorCount() {
605 String count = System.getProperty("of.barrierMessagePriorCount");
610 rv = Integer.parseInt(count);
611 } catch (Exception e) {
619 * This method returns the message Request ID previously assigned by the
620 * caller for a given OF message xid
626 * @return The Request ID
628 private Long getMessageRid(long swid, Integer xid) {
635 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
636 if (swxid2rid != null) {
637 rid = swxid2rid.get(xid);
643 * This method returns a copy of outstanding xid to rid mappings.for a given
648 * @return a copy of xid2rid mappings
650 public Map<Integer, Long> getSwXid2Rid(long swid) {
651 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
653 if (swxid2rid != null) {
654 return new HashMap<Integer, Long>(swxid2rid);
656 return new HashMap<Integer, Long>();
661 * Adds xid to rid mapping to the local DB
668 * The message Request ID
670 private void addXid2Rid(long swid, int xid, long rid) {
671 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
672 if (swxid2rid != null) {
673 swxid2rid.put(xid, rid);
678 * When an Error message is received, this method will be invoked to remove
679 * the offending xid from the local DB.
686 private void removeXid2Rid(long swid, int xid) {
687 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
688 if (swxid2rid != null) {
689 swxid2rid.remove(xid);
694 * Convert various result into Status
697 * The returned result from previous action
699 * add/modify/delete flow action
701 * The Request ID associated with the flow message
704 private Status getStatusInternal(Object result, String action, long rid) {
705 if (result instanceof Boolean) {
706 return ((Boolean) result == Boolean.TRUE) ? new Status(
707 StatusCode.SUCCESS, rid) : new Status(
708 StatusCode.TIMEOUT, errorString(null, action,
709 "Request Timed Out"));
710 } else if (result instanceof Status) {
711 return (Status) result;
712 } else if (result instanceof OFError) {
713 OFError res = (OFError) result;
714 return new Status(StatusCode.INTERNALERROR, errorString(
715 "program", action, Utils.getOFErrorString(res)));
717 return new Status(StatusCode.INTERNALERROR, errorString(
718 "send", action, "Internal Error"));
723 * When a Barrier reply is received, this method will be invoked to clear
729 private void clearXid2Rid(long swid) {
730 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
731 if (swxid2rid != null) {
737 public void updateNode(Node node, UpdateType type, Set<Property> props) {
738 long swid = (Long)node.getID();
742 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
743 this.xid2rid.put(swid, swxid2rid);
748 this.xid2rid.remove(swid);
755 public void updateNodeConnector(NodeConnector nodeConnector,
756 UpdateType type, Set<Property> props) {
759 private void registerWithOSGIConsole() {
760 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
762 bundleContext.registerService(CommandProvider.class.getName(), this,
767 public String getHelp() {
768 StringBuffer help = new StringBuffer();
769 help.append("-- Flow Programmer Service --\n");
770 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
771 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
772 return help.toString();
775 public void _px2r(CommandInterpreter ci) {
776 String st = ci.nextArgument();
778 ci.println("Please enter a valid node id");
784 sid = HexEncode.stringToLong(st);
785 } catch (NumberFormatException e) {
786 ci.println("Please enter a valid node id");
790 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
791 if (swxid2rid == null) {
792 ci.println("The node id entered does not exist");
796 ci.println("xid rid");
798 Set<Integer> xidSet = swxid2rid.keySet();
799 if (xidSet == null) {
803 for (Integer xid : xidSet) {
804 ci.println(xid + " " + swxid2rid.get(xid));
808 public void _px2rc(CommandInterpreter ci) {
809 ci.println("Max num of async messages sent prior to the Barrier message is "
810 + barrierMessagePriorCount);
816 public Future<RpcResult<AddFlowOutput>> addFlow(AddFlowInput input) {
817 throw new UnsupportedOperationException("Not implemented yet.");
822 public Future<RpcResult<RemoveFlowOutput>> removeFlow(RemoveFlowInput input) {
823 throw new UnsupportedOperationException("Not implemented yet.");
827 public Future<RpcResult<UpdateFlowOutput>> updateFlow(UpdateFlowInput input) {
828 throw new UnsupportedOperationException("Not implemented yet.");