package org.opendaylight.controller.protocol_plugin.openflow.internal;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Error;
-import org.openflow.protocol.OFError;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFFlowRemoved;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.OFAction;
-
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.openflow.protocol.OFError;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFFlowRemoved;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
public FlowProgrammerService() {
controller = null;
flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
+ containerToNc = new HashMap<String, Set<NodeConnector>>();
xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
}
*/
void init() {
this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
+ this.controller.addMessageListener(OFType.ERROR, this);
registerWithOSGIConsole();
}
*/
result = asyncMsgSend(node, sw, msg, rid);
}
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ? new Status(
- StatusCode.SUCCESS, null) : new Status(
- StatusCode.TIMEOUT, errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- OFError res = (OFError) result;
- if (res.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
- V6Error er = new V6Error(res);
- byte[] b = res.getError();
- ByteBuffer bb = ByteBuffer.allocate(b.length);
- bb.put(b);
- bb.rewind();
- er.readFrom(bb);
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action,
- "Vendor Extension Internal Error"));
- }
- return new Status(StatusCode.INTERNALERROR, errorString(
- "program", action, Utils.getOFErrorString(res)));
- } else {
- return new Status(StatusCode.INTERNALERROR, errorString(
- "send", action, "Internal Error"));
- }
+ return getStatusInternal(result, action, rid);
} else {
return new Status(StatusCode.GONE, errorString("send", action,
"Switch is not available"));
*/
result = asyncMsgSend(node, sw, msg1, rid);
}
- if (result instanceof Boolean) {
- if ((Boolean) result == Boolean.FALSE) {
- return new Status(StatusCode.TIMEOUT, errorString(null,
- action, "Request Timed Out"));
- } else if (msg2 == null) {
- return new Status(StatusCode.SUCCESS, null);
- }
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR, errorString(
- "program", action,
- Utils.getOFErrorString((OFError) result)));
- } else {
- return new Status(StatusCode.INTERNALERROR, errorString(
- "send", action, "Internal Error"));
+
+ Status rv = getStatusInternal(result, action, rid);
+ if ((msg2 == null) || !rv.isSuccess()) {
+ return rv;
}
- if (msg2 != null) {
- action = "add";
- if (rid == 0) {
- /*
- * Synchronous message send. Each message is followed by a
- * Barrier message.
- */
- result = sw.syncSend(msg2);
- } else {
- /*
- * Message will be sent asynchronously. A Barrier message
- * will be inserted automatically to synchronize the
- * progression.
- */
- result = asyncMsgSend(node, sw, msg2, rid);
- }
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ? new Status(
- StatusCode.SUCCESS, null) : new Status(
- StatusCode.TIMEOUT, errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR,
- errorString("program", action, Utils
- .getOFErrorString((OFError) result)));
- } else {
- return new Status(StatusCode.INTERNALERROR,
- errorString("send", action, "Internal Error"));
- }
+ action = "add";
+ if (rid == 0) {
+ /*
+ * Synchronous message send. Each message is followed by a
+ * Barrier message.
+ */
+ result = sw.syncSend(msg2);
+ } else {
+ /*
+ * Message will be sent asynchronously. A Barrier message
+ * will be inserted automatically to synchronize the
+ * progression.
+ */
+ result = asyncMsgSend(node, sw, msg2, rid);
}
+ return getStatusInternal(result, action, rid);
} else {
return new Status(StatusCode.GONE, errorString("send", action,
"Switch is not available"));
*/
result = asyncMsgSend(node, sw, msg, rid);
}
- if (result instanceof Boolean) {
- return ((Boolean) result == Boolean.TRUE) ? new Status(
- StatusCode.SUCCESS, null) : new Status(
- StatusCode.TIMEOUT, errorString(null, action,
- "Request Timed Out"));
- } else if (result instanceof OFError) {
- return new Status(StatusCode.INTERNALERROR, errorString(
- "program", action,
- Utils.getOFErrorString((OFError) result)));
- } else {
- return new Status(StatusCode.INTERNALERROR, errorString(
- "send", action, "Internal Error"));
- }
+ return getStatusInternal(result, action, rid);
} else {
return new Status(StatusCode.GONE, errorString("send", action,
"Switch is not available"));
@Override
public Status removeAllFlows(Node node) {
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
}
private String errorString(String phase, String action, String cause) {
public void receive(ISwitch sw, OFMessage msg) {
if (msg instanceof OFFlowRemoved) {
handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
+ } else if (msg instanceof OFError) {
+ handleErrorMessage(sw, (OFError) msg);
}
}
}
}
+ private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
+ Node node = NodeCreator.createOFNode(sw.getId());
+ OFMessage offendingMsg = errorMsg.getOffendingMsg();
+ Integer xid;
+ if (offendingMsg != null) {
+ xid = offendingMsg.getXid();
+ } else {
+ xid = errorMsg.getXid();
+ }
+
+ Long rid = getMessageRid(sw.getId(), xid);
+ /*
+ * Null or zero requestId indicates that the error message is meant for
+ * a sync message. It will be handled by the sync message worker thread.
+ * Hence we are done here.
+ */
+ if ((rid == null) || (rid == 0)) {
+ return;
+ }
+
+ /*
+ * Notifies the caller that error has been reported for a previous flow
+ * programming request
+ */
+ for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
+ .entrySet()) {
+ IFlowProgrammerNotifier notifier = containerNotifier.getValue();
+ notifier.flowErrorReported(node, rid, errorMsg);
+ }
+ }
+
@Override
public void tagUpdated(String containerName, Node n, short oldTag,
short newTag, UpdateType t) {
}
@Override
- public Status sendBarrierMessage(Node node) {
+ public Status syncSendBarrierMessage(Node node) {
if (!node.getType().equals(NodeIDType.OPENFLOW)) {
return new Status(StatusCode.NOTACCEPTABLE,
"The node does not support Barrier message.");
long swid = (Long) node.getID();
ISwitch sw = controller.getSwitch(swid);
if (sw != null) {
- sw.sendBarrierMessage();
+ sw.syncSendBarrierMessage();
clearXid2Rid(swid);
- return (new Status(StatusCode.SUCCESS, null));
+ return (new Status(StatusCode.SUCCESS));
+ } else {
+ return new Status(StatusCode.GONE,
+ "The node does not have a valid Switch reference.");
+ }
+ }
+ return new Status(StatusCode.INTERNALERROR,
+ "Failed to send Barrier message.");
+ }
+
+ @Override
+ public Status asyncSendBarrierMessage(Node node) {
+ if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The node does not support Barrier message.");
+ }
+
+ if (controller != null) {
+ long swid = (Long) node.getID();
+ ISwitch sw = controller.getSwitch(swid);
+ if (sw != null) {
+ sw.asyncSendBarrierMessage();
+ clearXid2Rid(swid);
+ return (new Status(StatusCode.SUCCESS));
} else {
return new Status(StatusCode.GONE,
"The node does not have a valid Switch reference.");
int size = swxid2rid.size();
if (size % barrierMessagePriorCount == 0) {
- result = sendBarrierMessage(node);
+ result = asyncSendBarrierMessage(node);
}
return result;
* The OF message xid
* @return The Request ID
*/
- public long getMessageRid(long swid, int xid) {
+ private Long getMessageRid(long swid, Integer xid) {
+ Long rid = null;
+
+ if (xid == null) {
+ return rid;
+ }
+
Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
- long rid = 0;
-
if (swxid2rid != null) {
rid = swxid2rid.get(xid);
}
}
}
+ /**
+ * Convert various result into Status
+ *
+ * @param result
+ * The returned result from previous action
+ * @param action
+ * add/modify/delete flow action
+ * @param rid
+ * The Request ID associated with the flow message
+ * @return Status
+ */
+ private Status getStatusInternal(Object result, String action, long rid) {
+ if (result instanceof Boolean) {
+ return ((Boolean) result == Boolean.TRUE) ? new Status(
+ StatusCode.SUCCESS, rid) : new Status(
+ StatusCode.TIMEOUT, errorString(null, action,
+ "Request Timed Out"));
+ } else if (result instanceof Status) {
+ return (Status) result;
+ } else if (result instanceof OFError) {
+ OFError res = (OFError) result;
+ return new Status(StatusCode.INTERNALERROR, errorString(
+ "program", action, Utils.getOFErrorString(res)));
+ } else {
+ return new Status(StatusCode.INTERNALERROR, errorString(
+ "send", action, "Internal Error"));
+ }
+ }
+
/**
* When a Barrier reply is received, this method will be invoked to clear
* the local DB