-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
package org.opendaylight.controller.protocol_plugin.openflow.internal;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.eclipse.osgi.framework.console.CommandInterpreter;
+import org.eclipse.osgi.framework.console.CommandProvider;
+import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
+import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Error;
-import org.openflow.protocol.OFError;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPort;
-
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
+import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
+import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.Node.NodeIDType;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.flowprogrammer.Flow;
import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
-import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.sal.match.Match;
+import org.opendaylight.controller.sal.match.MatchType;
+import org.opendaylight.controller.sal.utils.EtherTypes;
+import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.HexEncode;
+import org.opendaylight.controller.sal.utils.NodeCreator;
import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.openflow.protocol.OFError;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFFlowRemoved;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents the openflow plugin component in charge of programming the flows
- * on the switch. It servers the install requests coming from the SAL layer.
- *
- *
- *
+ * the flow programming and relay them to functional modules above SAL.
*/
-public class FlowProgrammerService implements IPluginInFlowProgrammerService {
- private static final Logger log = LoggerFactory
- .getLogger(FlowProgrammerService.class);
+public class FlowProgrammerService implements IPluginInFlowProgrammerService,
+ IMessageListener, IContainerListener, IInventoryShimExternalListener,
+ CommandProvider, IContainerAware {
+ private static final Logger log = LoggerFactory
+ .getLogger(FlowProgrammerService.class);
private IController controller;
+ private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
+ private Map<String, Set<NodeConnector>> containerToNc;
+ private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
+ private int barrierMessagePriorCount = getBarrierMessagePriorCount();
+ private IPluginOutConnectionService connectionOutService;
public FlowProgrammerService() {
controller = null;
+ flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
+ containerToNc = new HashMap<String, Set<NodeConnector>>();
+ xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
}
public void setController(IController core) {
}
}
+ void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+ connectionOutService = s;
+ }
+
+ void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+ if (connectionOutService == s) {
+ connectionOutService = null;
+ }
+ }
+
+ public void setFlowProgrammerNotifier(Map<String, ?> props,
+ IFlowProgrammerNotifier s) {
+ if (props == null || props.get("containerName") == null) {
+ log.error("Didn't receive the service correct properties");
+ return;
+ }
+ String containerName = (String) props.get("containerName");
+ this.flowProgrammerNotifiers.put(containerName, s);
+ }
+
+ public void unsetFlowProgrammerNotifier(Map<String, ?> props,
+ IFlowProgrammerNotifier s) {
+ if (props == null || props.get("containerName") == null) {
+ log.error("Didn't receive the service correct properties");
+ return;
+ }
+ String containerName = (String) props.get("containerName");
+ if (this.flowProgrammerNotifiers != null
+ && this.flowProgrammerNotifiers.containsKey(containerName)
+ && this.flowProgrammerNotifiers.get(containerName) == s) {
+ this.flowProgrammerNotifiers.remove(containerName);
+ }
+ }
+
/**
* Function called by the dependency manager when all the required
* dependencies are satisfied
*
*/
void init() {
+ this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
+ this.controller.addMessageListener(OFType.ERROR, this);
+ registerWithOSGIConsole();
}
/**
- * Function called by the dependency manager when at least one
- * dependency become unsatisfied or when the component is shutting
- * down because for example bundle is being stopped.
+ * Function called by the dependency manager when at least one dependency
+ * become unsatisfied or when the component is shutting down because for
+ * example bundle is being stopped.
*
*/
void destroy() {
}
/**
- * Function called by dependency manager after "init ()" is called
- * and after the services provided by the class are registered in
- * the service registry
+ * Function called by dependency manager after "init ()" is called and after
+ * the services provided by the class are registered in the service registry
*
*/
void start() {
}
/**
- * Function called by the dependency manager before the services
- * exported by the component are unregistered, this will be
- * followed by a "destroy ()" calls
+ * Function called by the dependency manager before the services exported by
+ * the component are unregistered, this will be followed by a "destroy ()"
+ * calls
*
*/
void stop() {
@Override
public Status addFlow(Node node, Flow flow) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Add flow will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return addFlowInternal(node, flow, 0);
+ }
+
+ @Override
+ public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Modify flow will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return modifyFlowInternal(node, oldFlow, newFlow, 0);
+ }
+
+ @Override
+ public Status removeFlow(Node node, Flow flow) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Remove flow will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return removeFlowInternal(node, flow, 0);
+ }
+
+ @Override
+ public Status addFlowAsync(Node node, Flow flow, long rid) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return addFlowInternal(node, flow, rid);
+ }
+
+ @Override
+ public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
+ long rid) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return modifyFlowInternal(node, oldFlow, newFlow, rid);
+ }
+
+ @Override
+ public Status removeFlowAsync(Node node, Flow flow, long rid) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return removeFlowInternal(node, flow, rid);
+ }
+
+ private Status addFlowInternal(Node node, Flow flow, long rid) {
String action = "add";
if (!node.getType().equals(NodeIDType.OPENFLOW)) {
- return new Status(StatusCode.NOTACCEPTABLE,
- errorString("send", action, "Invalid node type"));
+ return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
+ action, "Invalid node type"));
+ }
+
+ Status status = validateFlow(flow);
+ if (!status.isSuccess()) {
+ return status;
}
if (controller != null) {
FlowConverter x = new FlowConverter(flow);
OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
- /*
- * Synchronous message send
- */
- Object result = sw.syncSend(msg);
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ?
- new Status(StatusCode.SUCCESS, null)
- : new Status(StatusCode.TIMEOUT,
- errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- OFError res = (OFError) result;
- if (res.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
- V6Error er = new V6Error(res);
- byte[] b = res.getError();
- ByteBuffer bb = ByteBuffer.allocate(b.length);
- bb.put(b);
- bb.rewind();
- er.readFrom(bb);
- log.trace("V6Error {}",er);
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, "Vendor Extension Internal Error"));
- }
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, Utils
- .getOFErrorString(res)));
+ Object result;
+ if (rid == 0) {
+ /*
+ * Synchronous message send. Each message is followed by a
+ * Barrier message.
+ */
+ result = sw.syncSend(msg);
} else {
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal Error"));
+ /*
+ * Message will be sent asynchronously. A Barrier message
+ * will be inserted automatically to synchronize the
+ * progression.
+ */
+ result = asyncMsgSend(node, sw, msg, rid);
}
+ return getStatusInternal(result, action, rid);
} else {
return new Status(StatusCode.GONE, errorString("send", action,
- "Switch is not available"));
+ "Switch is not available"));
}
}
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal plugin error"));
+ return new Status(StatusCode.INTERNALERROR, errorString("send", action,
+ "Internal plugin error"));
}
- @Override
- public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+ /*
+ * Method which runs openflow 1.0 specific validation on the requested flow
+ * This validation is needed because the openflow switch will silently accept
+ * the request and install only the applicable match fields
+ */
+ private Status validateFlow(Flow flow) {
+ Match m = flow.getMatch();
+ boolean isIPEthertypeSet = m.isPresent(MatchType.DL_TYPE)
+ && (m.getField(MatchType.DL_TYPE).getValue().equals(EtherTypes.IPv4.shortValue()) || m
+ .getField(MatchType.DL_TYPE).getValue().equals(EtherTypes.IPv6.shortValue()));
+
+ // network address check
+ if ((m.isPresent(MatchType.NW_SRC) || m.isPresent(MatchType.NW_DST)) && !isIPEthertypeSet) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The match on network source or destination address cannot be accepted if the match "
+ + "on proper ethertype is missing");
+ }
+
+ // transport protocol check
+ if (m.isPresent(MatchType.NW_PROTO) && !isIPEthertypeSet) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The match on network protocol cannot be accepted if the match on proper ethertype is missing");
+ }
+
+ // transport ports check
+ if ((m.isPresent(MatchType.TP_SRC) || m.isPresent(MatchType.TP_DST))
+ && (!isIPEthertypeSet || m.isAny(MatchType.NW_PROTO))) {
+ return new Status(
+ StatusCode.NOTACCEPTABLE,
+ "The match on transport source or destination port cannot be accepted if the match on network protocol and match on IP ethertype are missing");
+ }
+ return new Status(StatusCode.SUCCESS);
+ }
+
+ private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
String action = "modify";
if (!node.getType().equals(NodeIDType.OPENFLOW)) {
- return new Status(StatusCode.NOTACCEPTABLE,
- errorString("send", action, "Invalid node type"));
+ return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
+ action, "Invalid node type"));
+ }
+
+ Status status = validateFlow(newFlow);
+ if (!status.isSuccess()) {
+ return status;
}
+
if (controller != null) {
ISwitch sw = controller.getSwitch((Long) node.getID());
if (sw != null) {
OFMessage msg1 = null, msg2 = null;
- // If priority and match portion are the same, send a modification message
+ // If priority and match portion are the same, send a
+ // modification message
if (oldFlow.getPriority() != newFlow.getPriority()
|| !oldFlow.getMatch().equals(newFlow.getMatch())) {
msg1 = new FlowConverter(oldFlow).getOFFlowMod(
* Synchronous message send
*/
action = (msg2 == null) ? "modify" : "delete";
- Object result = sw.syncSend(msg1);
- if (result instanceof Boolean) {
- if ((Boolean) result == Boolean.FALSE) {
- return new Status(StatusCode.TIMEOUT,
- errorString(null, action,
- "Request Timed Out"));
- } else if (msg2 == null) {
- return new Status(StatusCode.SUCCESS, null);
- }
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, Utils
- .getOFErrorString((OFError) result)));
+ Object result;
+ if (rid == 0) {
+ /*
+ * Synchronous message send. Each message is followed by a
+ * Barrier message.
+ */
+ result = sw.syncSend(msg1);
} else {
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal Error"));
+ /*
+ * Message will be sent asynchronously. A Barrier message
+ * will be inserted automatically to synchronize the
+ * progression.
+ */
+ result = asyncMsgSend(node, sw, msg1, rid);
+ }
+
+ Status rv = getStatusInternal(result, action, rid);
+ if ((msg2 == null) || !rv.isSuccess()) {
+ return rv;
}
- if (msg2 != null) {
- action = "add";
+ action = "add";
+ if (rid == 0) {
+ /*
+ * Synchronous message send. Each message is followed by a
+ * Barrier message.
+ */
result = sw.syncSend(msg2);
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ?
- new Status(StatusCode.SUCCESS, null)
- : new Status(StatusCode.TIMEOUT,
- errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, Utils
- .getOFErrorString((OFError) result)));
- } else {
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal Error"));
- }
+ } else {
+ /*
+ * Message will be sent asynchronously. A Barrier message
+ * will be inserted automatically to synchronize the
+ * progression.
+ */
+ result = asyncMsgSend(node, sw, msg2, rid);
}
+ return getStatusInternal(result, action, rid);
} else {
return new Status(StatusCode.GONE, errorString("send", action,
"Switch is not available"));
}
}
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal plugin error"));
+ return new Status(StatusCode.INTERNALERROR, errorString("send", action,
+ "Internal plugin error"));
}
- @Override
- public Status removeFlow(Node node, Flow flow) {
+ private Status removeFlowInternal(Node node, Flow flow, long rid) {
String action = "remove";
if (!node.getType().equals(NodeIDType.OPENFLOW)) {
- return new Status(StatusCode.NOTACCEPTABLE,
- errorString("send", action, "Invalid node type"));
+ return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
+ action, "Invalid node type"));
}
if (controller != null) {
ISwitch sw = controller.getSwitch((Long) node.getID());
if (sw != null) {
OFMessage msg = new FlowConverter(flow).getOFFlowMod(
OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
- Object result = sw.syncSend(msg);
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ?
- new Status(StatusCode.SUCCESS, null)
- : new Status(StatusCode.TIMEOUT,
- errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, Utils
- .getOFErrorString((OFError) result)));
+ Object result;
+ if (rid == 0) {
+ /*
+ * Synchronous message send. Each message is followed by a
+ * Barrier message.
+ */
+ result = sw.syncSend(msg);
} else {
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal Error"));
+ /*
+ * Message will be sent asynchronously. A Barrier message
+ * will be inserted automatically to synchronize the
+ * progression.
+ */
+ result = asyncMsgSend(node, sw, msg, rid);
}
+ return getStatusInternal(result, action, rid);
} else {
- return new Status(StatusCode.GONE, errorString("send", action,
+ return new Status(StatusCode.GONE, errorString("send", action,
"Switch is not available"));
}
}
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal plugin error"));
+ return new Status(StatusCode.INTERNALERROR, errorString("send", action,
+ "Internal plugin error"));
}
@Override
public Status removeAllFlows(Node node) {
- return new Status(StatusCode.SUCCESS, null);
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ return new Status(StatusCode.SUCCESS);
}
private String errorString(String phase, String action, String cause) {
+ " flow message: " : action + " the flow: ") + cause;
}
+ @Override
+ public void receive(ISwitch sw, OFMessage msg) {
+ if (msg instanceof OFFlowRemoved) {
+ handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
+ } else if (msg instanceof OFError) {
+ handleErrorMessage(sw, (OFError) msg);
+ }
+ }
+
+ private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
+ Node node = NodeCreator.createOFNode(sw.getId());
+ Flow flow = new FlowConverter(msg.getMatch(),
+ new ArrayList<OFAction>(0)).getFlow(node);
+ flow.setPriority(msg.getPriority());
+ flow.setIdleTimeout(msg.getIdleTimeout());
+ flow.setId(msg.getCookie());
+
+ Match match = flow.getMatch();
+ NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
+ .getField(MatchType.IN_PORT).getValue() : null;
+
+ for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
+ .entrySet()) {
+ String container = containerNotifier.getKey();
+ IFlowProgrammerNotifier notifier = containerNotifier.getValue();
+ /*
+ * Switch only provide us with the match information. For now let's
+ * try to identify the container membership only from the input port
+ * match field. In any case, upper layer consumers can derive
+ * whether the notification was not for them. More sophisticated
+ * filtering can be added later on.
+ */
+ if (inPort == null
+ || container.equals(GlobalConstants.DEFAULT.toString())
+ || (containerToNc.containsKey(container) && containerToNc.get(container).contains(inPort))) {
+ notifier.flowRemoved(node, flow);
+ }
+ }
+ }
+
+ private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
+ Node node = NodeCreator.createOFNode(sw.getId());
+ OFMessage offendingMsg = errorMsg.getOffendingMsg();
+ Integer xid;
+ if (offendingMsg != null) {
+ xid = offendingMsg.getXid();
+ } else {
+ xid = errorMsg.getXid();
+ }
+
+ Long rid = getMessageRid(sw.getId(), xid);
+ /*
+ * Null or zero requestId indicates that the error message is meant for
+ * a sync message. It will be handled by the sync message worker thread.
+ * Hence we are done here.
+ */
+ if ((rid == null) || (rid == 0)) {
+ return;
+ }
+
+ /*
+ * Notifies the caller that error has been reported for a previous flow
+ * programming request
+ */
+ for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
+ .entrySet()) {
+ IFlowProgrammerNotifier notifier = containerNotifier.getValue();
+ notifier.flowErrorReported(node, rid, Utils.getOFErrorString(errorMsg));
+ }
+ }
+
+ @Override
+ public void tagUpdated(String containerName, Node n, short oldTag,
+ short newTag, UpdateType t) {
+
+ }
+
+ @Override
+ public void containerFlowUpdated(String containerName,
+ ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
+ }
+
+ @Override
+ public void nodeConnectorUpdated(String containerName, NodeConnector p,
+ UpdateType type) {
+ switch (type) {
+ case ADDED:
+ if (!containerToNc.containsKey(containerName)) {
+ containerToNc.put(containerName, new HashSet<NodeConnector>());
+ }
+ containerToNc.get(containerName).add(p);
+ break;
+ case CHANGED:
+ break;
+ case REMOVED:
+ Set<NodeConnector> target = containerToNc.get(containerName);
+ if (target != null) {
+ target.remove(p);
+ }
+ break;
+ default:
+ }
+ }
+
+ @Override
+ public void containerModeUpdated(UpdateType t) {
+
+ }
+
+ @Override
+ public Status syncSendBarrierMessage(Node node) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The node does not support Barrier message.");
+ }
+
+ if (controller != null) {
+ long swid = (Long) node.getID();
+ ISwitch sw = controller.getSwitch(swid);
+ if (sw != null) {
+ sw.syncSendBarrierMessage();
+ clearXid2Rid(swid);
+ return (new Status(StatusCode.SUCCESS));
+ } else {
+ return new Status(StatusCode.GONE,
+ "The node does not have a valid Switch reference.");
+ }
+ }
+ return new Status(StatusCode.INTERNALERROR,
+ "Failed to send Barrier message.");
+ }
+
+ @Override
+ public Status asyncSendBarrierMessage(Node node) {
+ if (!connectionOutService.isLocal(node)) {
+ log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
+ return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+ }
+
+ if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The node does not support Barrier message.");
+ }
+
+ if (controller != null) {
+ long swid = (Long) node.getID();
+ ISwitch sw = controller.getSwitch(swid);
+ if (sw != null) {
+ sw.asyncSendBarrierMessage();
+ clearXid2Rid(swid);
+ return (new Status(StatusCode.SUCCESS));
+ } else {
+ return new Status(StatusCode.GONE,
+ "The node does not have a valid Switch reference.");
+ }
+ }
+ return new Status(StatusCode.INTERNALERROR,
+ "Failed to send Barrier message.");
+ }
+
+ /**
+ * This method sends the message asynchronously until the number of messages
+ * sent reaches a threshold. Then a Barrier message is sent automatically
+ * for sync purpose. An unique Request ID associated with the message is
+ * passed down by the caller. The Request ID will be returned to the caller
+ * when an error message is received from the switch.
+ *
+ * @param node
+ * The node
+ * @param msg
+ * The switch
+ * @param msg
+ * The OF message to be sent
+ * @param rid
+ * The Request Id
+ * @return result
+ */
+ private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
+ Object result = Boolean.TRUE;
+ long swid = (Long) node.getID();
+ int xid;
+
+ xid = sw.asyncSend(msg);
+ addXid2Rid(swid, xid, rid);
+
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+ if (swxid2rid == null) {
+ return result;
+ }
+
+ int size = swxid2rid.size();
+ if (size % barrierMessagePriorCount == 0) {
+ result = asyncSendBarrierMessage(node);
+ }
+
+ return result;
+ }
+
+ /**
+ * A number of async messages are sent followed by a synchronous Barrier
+ * message. This method returns the maximum async messages that can be sent
+ * before the Barrier message.
+ *
+ * @return The max count of async messages sent prior to Barrier message
+ */
+ private int getBarrierMessagePriorCount() {
+ String count = System.getProperty("of.barrierMessagePriorCount");
+ int rv = 100;
+
+ if (count != null) {
+ try {
+ rv = Integer.parseInt(count);
+ } catch (Exception e) {
+ }
+ }
+
+ return rv;
+ }
+
+ /**
+ * This method returns the message Request ID previously assigned by the
+ * caller for a given OF message xid
+ *
+ * @param swid
+ * The switch id
+ * @param xid
+ * The OF message xid
+ * @return The Request ID
+ */
+ private Long getMessageRid(long swid, Integer xid) {
+ Long rid = null;
+
+ if (xid == null) {
+ return rid;
+ }
+
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+ if (swxid2rid != null) {
+ rid = swxid2rid.get(xid);
+ }
+ return rid;
+ }
+
+ /**
+ * This method returns a copy of outstanding xid to rid mappings.for a given
+ * switch
+ *
+ * @param swid
+ * The switch id
+ * @return a copy of xid2rid mappings
+ */
+ public Map<Integer, Long> getSwXid2Rid(long swid) {
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+
+ if (swxid2rid != null) {
+ return new HashMap<Integer, Long>(swxid2rid);
+ } else {
+ return new HashMap<Integer, Long>();
+ }
+ }
+
+ /**
+ * Adds xid to rid mapping to the local DB
+ *
+ * @param swid
+ * The switch id
+ * @param xid
+ * The OF message xid
+ * @param rid
+ * The message Request ID
+ */
+ private void addXid2Rid(long swid, int xid, long rid) {
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+ if (swxid2rid != null) {
+ swxid2rid.put(xid, rid);
+ }
+ }
+
+ /**
+ * When an Error message is received, this method will be invoked to remove
+ * the offending xid from the local DB.
+ *
+ * @param swid
+ * The switch id
+ * @param xid
+ * The OF message xid
+ */
+ private void removeXid2Rid(long swid, int xid) {
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+ if (swxid2rid != null) {
+ swxid2rid.remove(xid);
+ }
+ }
+
+ /**
+ * Convert various result into Status
+ *
+ * @param result
+ * The returned result from previous action
+ * @param action
+ * add/modify/delete flow action
+ * @param rid
+ * The Request ID associated with the flow message
+ * @return Status
+ */
+ private Status getStatusInternal(Object result, String action, long rid) {
+ if (result instanceof Boolean) {
+ return ((Boolean) result == Boolean.TRUE) ? new Status(
+ StatusCode.SUCCESS, rid) : new Status(
+ StatusCode.TIMEOUT, errorString(null, action,
+ "Request Timed Out"));
+ } else if (result instanceof Status) {
+ return (Status) result;
+ } else if (result instanceof OFError) {
+ OFError res = (OFError) result;
+ return new Status(StatusCode.INTERNALERROR, errorString(
+ "program", action, Utils.getOFErrorString(res)));
+ } else {
+ return new Status(StatusCode.INTERNALERROR, errorString(
+ "send", action, "Internal Error"));
+ }
+ }
+
+ /**
+ * When a Barrier reply is received, this method will be invoked to clear
+ * the local DB
+ *
+ * @param swid
+ * The switch id
+ */
+ private void clearXid2Rid(long swid) {
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+ if (swxid2rid != null) {
+ swxid2rid.clear();
+ }
+ }
+
+ @Override
+ public void updateNode(Node node, UpdateType type, Set<Property> props) {
+ long swid = (Long)node.getID();
+
+ switch (type) {
+ case ADDED:
+ Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
+ this.xid2rid.put(swid, swxid2rid);
+ break;
+ case CHANGED:
+ break;
+ case REMOVED:
+ this.xid2rid.remove(swid);
+ break;
+ default:
+ }
+ }
+
+ @Override
+ public void updateNodeConnector(NodeConnector nodeConnector,
+ UpdateType type, Set<Property> props) {
+ }
+
+ private void registerWithOSGIConsole() {
+ BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
+ .getBundleContext();
+ bundleContext.registerService(CommandProvider.class.getName(), this,
+ null);
+ }
+
+ @Override
+ public String getHelp() {
+ StringBuffer help = new StringBuffer();
+ help.append("-- Flow Programmer Service --\n");
+ help.append("\t px2r <node id> - Print outstanding xid2rid mappings for a given node id\n");
+ help.append("\t px2rc - Print max num of async msgs prior to the Barrier\n");
+ return help.toString();
+ }
+
+ public void _px2r(CommandInterpreter ci) {
+ String st = ci.nextArgument();
+ if (st == null) {
+ ci.println("Please enter a valid node id");
+ return;
+ }
+
+ long sid;
+ try {
+ sid = HexEncode.stringToLong(st);
+ } catch (NumberFormatException e) {
+ ci.println("Please enter a valid node id");
+ return;
+ }
+
+ Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
+ if (swxid2rid == null) {
+ ci.println("The node id entered does not exist");
+ return;
+ }
+
+ ci.println("xid rid");
+
+ Set<Integer> xidSet = swxid2rid.keySet();
+ if (xidSet == null) {
+ return;
+ }
+
+ for (Integer xid : xidSet) {
+ ci.println(xid + " " + swxid2rid.get(xid));
+ }
+ }
+
+ public void _px2rc(CommandInterpreter ci) {
+ ci.println("Max num of async messages sent prior to the Barrier message is "
+ + barrierMessagePriorCount);
+ }
+
+ @Override
+ public void containerCreate(String containerName) {
+ // do nothing
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ containerToNc.remove(containerName);
+ }
}