2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.HashSet;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
20 import org.eclipse.osgi.framework.console.CommandInterpreter;
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
23 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
27 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Error;
28 import org.openflow.protocol.OFError;
29 import org.openflow.protocol.OFFlowMod;
30 import org.openflow.protocol.OFFlowRemoved;
31 import org.openflow.protocol.OFMessage;
32 import org.openflow.protocol.OFPort;
33 import org.openflow.protocol.OFType;
34 import org.openflow.protocol.action.OFAction;
36 import org.opendaylight.controller.sal.core.ContainerFlow;
37 import org.opendaylight.controller.sal.core.IContainerListener;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.Node.NodeIDType;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
45 import org.opendaylight.controller.sal.match.Match;
46 import org.opendaylight.controller.sal.match.MatchType;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.opendaylight.controller.sal.utils.HexEncode;
49 import org.opendaylight.controller.sal.utils.NodeCreator;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.controller.sal.utils.Status;
52 import org.osgi.framework.BundleContext;
53 import org.osgi.framework.FrameworkUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * Represents the openflow plugin component in charge of programming the flows
59 * the flow programming and relay them to functional modules above SAL.
61 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
62 IMessageListener, IContainerListener, IInventoryShimExternalListener,
64 private static final Logger log = LoggerFactory
65 .getLogger(FlowProgrammerService.class);
66 private IController controller;
67 private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
68 private Map<String, Set<NodeConnector>> containerToNc;
69 private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
70 private int barrierMessagePriorCount = getBarrierMessagePriorCount();
72 public FlowProgrammerService() {
74 flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
75 xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
78 public void setController(IController core) {
79 this.controller = core;
82 public void unsetController(IController core) {
83 if (this.controller == core) {
84 this.controller = null;
88 public void setFlowProgrammerNotifier(Map<String, ?> props,
89 IFlowProgrammerNotifier s) {
90 if (props == null || props.get("containerName") == null) {
91 log.error("Didn't receive the service correct properties");
94 String containerName = (String) props.get("containerName");
95 this.flowProgrammerNotifiers.put(containerName, s);
98 public void unsetFlowProgrammerNotifier(Map<String, ?> props,
99 IFlowProgrammerNotifier s) {
100 if (props == null || props.get("containerName") == null) {
101 log.error("Didn't receive the service correct properties");
104 String containerName = (String) props.get("containerName");
105 if (this.flowProgrammerNotifiers != null
106 && this.flowProgrammerNotifiers.containsKey(containerName)
107 && this.flowProgrammerNotifiers.get(containerName) == s) {
108 this.flowProgrammerNotifiers.remove(containerName);
113 * Function called by the dependency manager when all the required
114 * dependencies are satisfied
118 this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
119 registerWithOSGIConsole();
123 * Function called by the dependency manager when at least one dependency
124 * become unsatisfied or when the component is shutting down because for
125 * example bundle is being stopped.
132 * Function called by dependency manager after "init ()" is called and after
133 * the services provided by the class are registered in the service registry
140 * Function called by the dependency manager before the services exported by
141 * the component are unregistered, this will be followed by a "destroy ()"
149 public Status addFlow(Node node, Flow flow) {
150 return addFlowInternal(node, flow, 0);
154 public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
155 return modifyFlowInternal(node, oldFlow, newFlow, 0);
159 public Status removeFlow(Node node, Flow flow) {
160 return removeFlowInternal(node, flow, 0);
164 public Status addFlowAsync(Node node, Flow flow, long rid) {
165 return addFlowInternal(node, flow, rid);
169 public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
171 return modifyFlowInternal(node, oldFlow, newFlow, rid);
175 public Status removeFlowAsync(Node node, Flow flow, long rid) {
176 return removeFlowInternal(node, flow, rid);
179 private Status addFlowInternal(Node node, Flow flow, long rid) {
180 String action = "add";
181 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
182 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
183 action, "Invalid node type"));
186 if (controller != null) {
187 ISwitch sw = controller.getSwitch((Long) node.getID());
189 FlowConverter x = new FlowConverter(flow);
190 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
195 * Synchronous message send. Each message is followed by a
198 result = sw.syncSend(msg);
201 * Message will be sent asynchronously. A Barrier message
202 * will be inserted automatically to synchronize the
205 result = asyncMsgSend(node, sw, msg, rid);
207 if (result instanceof Boolean) {
208 return ((Boolean) result == Boolean.TRUE) ? new Status(
209 StatusCode.SUCCESS, null) : new Status(
210 StatusCode.TIMEOUT, errorString(null, action,
211 "Request Timed Out"));
212 } else if (result instanceof OFError) {
213 OFError res = (OFError) result;
214 if (res.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
215 V6Error er = new V6Error(res);
216 byte[] b = res.getError();
217 ByteBuffer bb = ByteBuffer.allocate(b.length);
221 return new Status(StatusCode.INTERNALERROR,
222 errorString("program", action,
223 "Vendor Extension Internal Error"));
225 return new Status(StatusCode.INTERNALERROR, errorString(
226 "program", action, Utils.getOFErrorString(res)));
228 return new Status(StatusCode.INTERNALERROR, errorString(
229 "send", action, "Internal Error"));
232 return new Status(StatusCode.GONE, errorString("send", action,
233 "Switch is not available"));
236 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
237 "Internal plugin error"));
240 private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
241 String action = "modify";
242 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
243 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
244 action, "Invalid node type"));
246 if (controller != null) {
247 ISwitch sw = controller.getSwitch((Long) node.getID());
249 OFMessage msg1 = null, msg2 = null;
251 // If priority and match portion are the same, send a
252 // modification message
253 if (oldFlow.getPriority() != newFlow.getPriority()
254 || !oldFlow.getMatch().equals(newFlow.getMatch())) {
255 msg1 = new FlowConverter(oldFlow).getOFFlowMod(
256 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
257 msg2 = new FlowConverter(newFlow).getOFFlowMod(
258 OFFlowMod.OFPFC_ADD, null);
260 msg1 = new FlowConverter(newFlow).getOFFlowMod(
261 OFFlowMod.OFPFC_MODIFY_STRICT, null);
264 * Synchronous message send
266 action = (msg2 == null) ? "modify" : "delete";
270 * Synchronous message send. Each message is followed by a
273 result = sw.syncSend(msg1);
276 * Message will be sent asynchronously. A Barrier message
277 * will be inserted automatically to synchronize the
280 result = asyncMsgSend(node, sw, msg1, rid);
282 if (result instanceof Boolean) {
283 if ((Boolean) result == Boolean.FALSE) {
284 return new Status(StatusCode.TIMEOUT, errorString(null,
285 action, "Request Timed Out"));
286 } else if (msg2 == null) {
287 return new Status(StatusCode.SUCCESS, null);
289 } else if (result instanceof OFError) {
290 return new Status(StatusCode.INTERNALERROR, errorString(
292 Utils.getOFErrorString((OFError) result)));
294 return new Status(StatusCode.INTERNALERROR, errorString(
295 "send", action, "Internal Error"));
302 * Synchronous message send. Each message is followed by a
305 result = sw.syncSend(msg2);
308 * Message will be sent asynchronously. A Barrier message
309 * will be inserted automatically to synchronize the
312 result = asyncMsgSend(node, sw, msg2, rid);
314 if (result instanceof Boolean) {
315 return ((Boolean) result == Boolean.TRUE) ? new Status(
316 StatusCode.SUCCESS, null) : new Status(
317 StatusCode.TIMEOUT, errorString(null, action,
318 "Request Timed Out"));
319 } else if (result instanceof OFError) {
320 return new Status(StatusCode.INTERNALERROR,
321 errorString("program", action, Utils
322 .getOFErrorString((OFError) result)));
324 return new Status(StatusCode.INTERNALERROR,
325 errorString("send", action, "Internal Error"));
329 return new Status(StatusCode.GONE, errorString("send", action,
330 "Switch is not available"));
333 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
334 "Internal plugin error"));
337 private Status removeFlowInternal(Node node, Flow flow, long rid) {
338 String action = "remove";
339 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
340 return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
341 action, "Invalid node type"));
343 if (controller != null) {
344 ISwitch sw = controller.getSwitch((Long) node.getID());
346 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
347 OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
351 * Synchronous message send. Each message is followed by a
354 result = sw.syncSend(msg);
357 * Message will be sent asynchronously. A Barrier message
358 * will be inserted automatically to synchronize the
361 result = asyncMsgSend(node, sw, msg, rid);
363 if (result instanceof Boolean) {
364 return ((Boolean) result == Boolean.TRUE) ? new Status(
365 StatusCode.SUCCESS, null) : new Status(
366 StatusCode.TIMEOUT, errorString(null, action,
367 "Request Timed Out"));
368 } else if (result instanceof OFError) {
369 return new Status(StatusCode.INTERNALERROR, errorString(
371 Utils.getOFErrorString((OFError) result)));
373 return new Status(StatusCode.INTERNALERROR, errorString(
374 "send", action, "Internal Error"));
377 return new Status(StatusCode.GONE, errorString("send", action,
378 "Switch is not available"));
381 return new Status(StatusCode.INTERNALERROR, errorString("send", action,
382 "Internal plugin error"));
386 public Status removeAllFlows(Node node) {
387 return new Status(StatusCode.SUCCESS, null);
390 private String errorString(String phase, String action, String cause) {
392 + ((phase != null) ? phase + " the " + action
393 + " flow message: " : action + " the flow: ") + cause;
397 public void receive(ISwitch sw, OFMessage msg) {
398 if (msg instanceof OFFlowRemoved) {
399 handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
403 private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
404 Node node = NodeCreator.createOFNode(sw.getId());
405 Flow flow = new FlowConverter(msg.getMatch(),
406 new ArrayList<OFAction>(0)).getFlow(node);
407 flow.setPriority(msg.getPriority());
408 flow.setIdleTimeout(msg.getIdleTimeout());
409 flow.setId(msg.getCookie());
411 Match match = flow.getMatch();
412 NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
413 .getField(MatchType.IN_PORT).getValue() : null;
415 for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
417 String container = containerNotifier.getKey();
418 IFlowProgrammerNotifier notifier = containerNotifier.getValue();
420 * Switch only provide us with the match information. For now let's
421 * try to identify the container membership only from the input port
422 * match field. In any case, upper layer consumers can derive
423 * whether the notification was not for them. More sophisticated
424 * filtering can be added later on.
427 || container.equals(GlobalConstants.DEFAULT.toString())
428 || this.containerToNc.get(container).contains(inPort)) {
429 notifier.flowRemoved(node, flow);
435 public void tagUpdated(String containerName, Node n, short oldTag,
436 short newTag, UpdateType t) {
441 public void containerFlowUpdated(String containerName,
442 ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
446 public void nodeConnectorUpdated(String containerName, NodeConnector p,
448 Set<NodeConnector> target = null;
452 if (!containerToNc.containsKey(containerName)) {
453 containerToNc.put(containerName, new HashSet<NodeConnector>());
455 containerToNc.get(containerName).add(p);
460 target = containerToNc.get(containerName);
461 if (target != null) {
470 public void containerModeUpdated(UpdateType t) {
475 public Status sendBarrierMessage(Node node) {
476 if (!node.getType().equals(NodeIDType.OPENFLOW)) {
477 return new Status(StatusCode.NOTACCEPTABLE,
478 "The node does not support Barrier message.");
481 if (controller != null) {
482 long swid = (Long) node.getID();
483 ISwitch sw = controller.getSwitch(swid);
485 sw.sendBarrierMessage();
487 return (new Status(StatusCode.SUCCESS, null));
489 return new Status(StatusCode.GONE,
490 "The node does not have a valid Switch reference.");
493 return new Status(StatusCode.INTERNALERROR,
494 "Failed to send Barrier message.");
498 * This method sends the message asynchronously until the number of messages
499 * sent reaches a threshold. Then a Barrier message is sent automatically
500 * for sync purpose. An unique Request ID associated with the message is
501 * passed down by the caller. The Request ID will be returned to the caller
502 * when an error message is received from the switch.
509 * The OF message to be sent
514 private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
515 Object result = Boolean.TRUE;
516 long swid = (Long) node.getID();
519 xid = sw.asyncSend(msg);
520 addXid2Rid(swid, xid, rid);
522 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
523 if (swxid2rid == null) {
527 int size = swxid2rid.size();
528 if (size % barrierMessagePriorCount == 0) {
529 result = sendBarrierMessage(node);
536 * A number of async messages are sent followed by a synchronous Barrier
537 * message. This method returns the maximum async messages that can be sent
538 * before the Barrier message.
540 * @return The max count of async messages sent prior to Barrier message
542 private int getBarrierMessagePriorCount() {
543 String count = System.getProperty("of.barrierMessagePriorCount");
548 rv = Integer.parseInt(count);
549 } catch (Exception e) {
557 * This method returns the message Request ID previously assigned by the
558 * caller for a given OF message xid
564 * @return The Request ID
566 public long getMessageRid(long swid, int xid) {
567 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
570 if (swxid2rid != null) {
571 rid = swxid2rid.get(xid);
577 * This method returns a copy of outstanding xid to rid mappings.for a given
582 * @return a copy of xid2rid mappings
584 public Map<Integer, Long> getSwXid2Rid(long swid) {
585 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
587 if (swxid2rid != null) {
588 return new HashMap<Integer, Long>(swxid2rid);
590 return new HashMap<Integer, Long>();
595 * Adds xid to rid mapping to the local DB
602 * The message Request ID
604 private void addXid2Rid(long swid, int xid, long rid) {
605 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
606 if (swxid2rid != null) {
607 swxid2rid.put(xid, rid);
612 * When an Error message is received, this method will be invoked to remove
613 * the offending xid from the local DB.
620 private void removeXid2Rid(long swid, int xid) {
621 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
622 if (swxid2rid != null) {
623 swxid2rid.remove(xid);
628 * When a Barrier reply is received, this method will be invoked to clear
634 private void clearXid2Rid(long swid) {
635 Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
636 if (swxid2rid != null) {
642 public void updateNode(Node node, UpdateType type, Set<Property> props) {
643 long swid = (Long)node.getID();
647 Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
648 this.xid2rid.put(swid, swxid2rid);
653 this.xid2rid.remove(swid);
660 public void updateNodeConnector(NodeConnector nodeConnector,
661 UpdateType type, Set<Property> props) {
664 private void registerWithOSGIConsole() {
665 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
667 bundleContext.registerService(CommandProvider.class.getName(), this,
672 public String getHelp() {
673 StringBuffer help = new StringBuffer();
674 help.append("-- Flow Programmer Service --\n");
675 help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
676 help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
677 return help.toString();
680 public void _px2r(CommandInterpreter ci) {
681 String st = ci.nextArgument();
683 ci.println("Please enter a valid node id");
689 sid = HexEncode.stringToLong(st);
690 } catch (NumberFormatException e) {
691 ci.println("Please enter a valid node id");
695 Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
696 if (swxid2rid == null) {
697 ci.println("The node id entered does not exist");
701 ci.println("xid rid");
703 Set<Integer> xidSet = swxid2rid.keySet();
704 if (xidSet == null) {
708 for (Integer xid : xidSet) {
709 ci.println(xid + " " + swxid2rid.get(xid));
713 public void _px2rc(CommandInterpreter ci) {
714 ci.println("Max num of async messages sent prior to the Barrier message is "
715 + barrierMessagePriorCount);