- Plugin sends Barrier msg every 100 async msgs (configurable thru config.ini: of...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / FlowProgrammerService.java
index 2926c22c43f43d4c903157a4f09a1a6758c422a1..230376555eb556a58ecee41892e3cde47b7aa5c0 100644 (file)
@@ -10,13 +10,17 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.eclipse.osgi.framework.console.CommandInterpreter;
+import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
+import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
@@ -34,15 +38,19 @@ import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
 import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.match.MatchType;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.NodeCreator;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.sal.utils.Status;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,16 +59,20 @@ import org.slf4j.LoggerFactory;
  * the flow programming and relay them to functional modules above SAL.
  */
 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
-        IMessageListener, IContainerListener {
+        IMessageListener, IContainerListener, IInventoryShimExternalListener,
+        CommandProvider {
     private static final Logger log = LoggerFactory
             .getLogger(FlowProgrammerService.class);
     private IController controller;
     private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
     private Map<String, Set<NodeConnector>> containerToNc;
+    private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
+    private int barrierMessagePriorCount = getBarrierMessagePriorCount();
 
     public FlowProgrammerService() {
         controller = null;
         flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
+        xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
     }
 
     public void setController(IController core) {
@@ -104,6 +116,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
      */
     void init() {
         this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
+        registerWithOSGIConsole();
     }
 
     /**
@@ -134,6 +147,36 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status addFlow(Node node, Flow flow) {
+        return addFlowInternal(node, flow, 0);
+    }
+
+    @Override
+    public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+        return modifyFlowInternal(node, oldFlow, newFlow, 0);
+    }
+
+    @Override
+    public Status removeFlow(Node node, Flow flow) {
+        return removeFlowInternal(node, flow, 0);
+    }
+
+    @Override
+    public Status addFlowAsync(Node node, Flow flow, long rid) {
+        return addFlowInternal(node, flow, rid);
+    }
+
+    @Override
+    public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
+            long rid) {
+        return modifyFlowInternal(node, oldFlow, newFlow, rid);
+    }
+
+    @Override
+    public Status removeFlowAsync(Node node, Flow flow, long rid) {
+        return removeFlowInternal(node, flow, rid);
+    }
+
+    private Status addFlowInternal(Node node, Flow flow, long rid) {
         String action = "add";
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
@@ -146,10 +189,21 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                 FlowConverter x = new FlowConverter(flow);
                 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
 
-                /*
-                 * Synchronous message send
-                 */
-                Object result = sw.syncSend(msg);
+                Object result;
+                if (rid == 0) {
+                    /*
+                     * Synchronous message send. Each message is followed by a
+                     * Barrier message.
+                     */
+                    result = sw.syncSend(msg);
+                } else {
+                    /*
+                     * Message will be sent asynchronously. A Barrier message
+                     * will be inserted automatically to synchronize the
+                     * progression.
+                     */
+                    result = asyncMsgSend(node, sw, msg, rid);  
+                }
                 if (result instanceof Boolean) {
                     return ((Boolean) result == Boolean.TRUE) ? new Status(
                             StatusCode.SUCCESS, null) : new Status(
@@ -183,8 +237,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                 "Internal plugin error"));
     }
 
-    @Override
-    public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+    private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
         String action = "modify";
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
@@ -211,7 +264,21 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                  * Synchronous message send
                  */
                 action = (msg2 == null) ? "modify" : "delete";
-                Object result = sw.syncSend(msg1);
+                Object result;
+                if (rid == 0) {
+                    /*
+                     * Synchronous message send. Each message is followed by a
+                     * Barrier message.
+                     */
+                    result = sw.syncSend(msg1);
+                } else {
+                    /*
+                     * Message will be sent asynchronously. A Barrier message
+                     * will be inserted automatically to synchronize the
+                     * progression.
+                     */
+                    result = asyncMsgSend(node, sw, msg1, rid);
+                }
                 if (result instanceof Boolean) {
                     if ((Boolean) result == Boolean.FALSE) {
                         return new Status(StatusCode.TIMEOUT, errorString(null,
@@ -230,7 +297,20 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
                 if (msg2 != null) {
                     action = "add";
-                    result = sw.syncSend(msg2);
+                    if (rid == 0) {
+                        /*
+                         * Synchronous message send. Each message is followed by a
+                         * Barrier message.
+                         */
+                        result = sw.syncSend(msg2);
+                    } else {
+                        /*
+                         * Message will be sent asynchronously. A Barrier message
+                         * will be inserted automatically to synchronize the
+                         * progression.
+                         */
+                        result = asyncMsgSend(node, sw, msg2, rid);
+                    }
                     if (result instanceof Boolean) {
                         return ((Boolean) result == Boolean.TRUE) ? new Status(
                                 StatusCode.SUCCESS, null) : new Status(
@@ -254,8 +334,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                 "Internal plugin error"));
     }
 
-    @Override
-    public Status removeFlow(Node node, Flow flow) {
+    private Status removeFlowInternal(Node node, Flow flow, long rid) {
         String action = "remove";
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
@@ -266,7 +345,21 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             if (sw != null) {
                 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
                         OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
-                Object result = sw.syncSend(msg);
+                Object result;
+                if (rid == 0) {
+                    /*
+                     * Synchronous message send. Each message is followed by a
+                     * Barrier message.
+                     */
+                    result = sw.syncSend(msg);
+                } else {
+                    /*
+                     * Message will be sent asynchronously. A Barrier message
+                     * will be inserted automatically to synchronize the
+                     * progression.
+                     */
+                    result = asyncMsgSend(node, sw, msg, rid);
+                }
                 if (result instanceof Boolean) {
                     return ((Boolean) result == Boolean.TRUE) ? new Status(
                             StatusCode.SUCCESS, null) : new Status(
@@ -371,11 +464,254 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             break;
         default:
         }
-
     }
 
     @Override
     public void containerModeUpdated(UpdateType t) {
 
     }
+
+    @Override
+    public Status sendBarrierMessage(Node node) {
+        if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+            return new Status(StatusCode.NOTACCEPTABLE,
+                    "The node does not support Barrier message.");
+        }
+
+        if (controller != null) {
+            long swid = (Long) node.getID();
+            ISwitch sw = controller.getSwitch(swid);
+            if (sw != null) {
+                sw.sendBarrierMessage();
+                clearXid2Rid(swid);
+                return (new Status(StatusCode.SUCCESS, null));
+            } else {
+                return new Status(StatusCode.GONE,
+                        "The node does not have a valid Switch reference.");
+            }
+        }
+        return new Status(StatusCode.INTERNALERROR,
+                "Failed to send Barrier message.");
+    }
+    
+    /**
+     * This method sends the message asynchronously until the number of messages
+     * sent reaches a threshold. Then a Barrier message is sent automatically
+     * for sync purpose. An unique Request ID associated with the message is
+     * passed down by the caller. The Request ID will be returned to the caller
+     * when an error message is received from the switch.
+     * 
+     * @param node
+     *            The node
+     * @param msg
+     *            The switch
+     * @param msg
+     *            The OF message to be sent
+     * @param rid
+     *            The Request Id
+     * @return result
+     */
+    private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
+        Object result = Boolean.TRUE;
+        long swid = (Long) node.getID();
+        int xid;
+
+        xid = sw.asyncSend(msg);
+        addXid2Rid(swid, xid, rid);
+        
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        if (swxid2rid == null) {
+            return result;
+        }
+        
+        int size = swxid2rid.size();
+        if (size % barrierMessagePriorCount == 0) {
+            result = sendBarrierMessage(node);
+        }
+        
+        return result;
+    }
+    
+    /**
+     * A number of async messages are sent followed by a synchronous Barrier
+     * message. This method returns the maximum async messages that can be sent
+     * before the Barrier message.
+     * 
+     * @return The max count of async messages sent prior to Barrier message
+     */
+    private int getBarrierMessagePriorCount() {
+        String count = System.getProperty("of.barrierMessagePriorCount");
+        int rv = 100;
+
+        if (count != null) {
+            try {
+                rv = Integer.parseInt(count);
+            } catch (Exception e) {
+            }
+        }
+
+        return rv;
+    }
+    
+    /**
+     * This method returns the message Request ID previously assigned by the
+     * caller for a given OF message xid
+     * 
+     * @param swid
+     *            The switch id
+     * @param xid
+     *            The OF message xid
+     * @return The Request ID
+     */
+    public long getMessageRid(long swid, int xid) {
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        long rid = 0;
+        
+        if (swxid2rid != null) {
+            rid = swxid2rid.get(xid);
+        }
+        return rid;
+    }
+
+    /**
+     * This method returns a copy of outstanding xid to rid mappings.for a given
+     * switch
+     * 
+     * @param swid
+     *            The switch id
+     * @return a copy of xid2rid mappings
+     */
+    public Map<Integer, Long> getSwXid2Rid(long swid) {
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        
+        if (swxid2rid != null) {
+            return new HashMap<Integer, Long>(swxid2rid);
+        } else {
+            return new HashMap<Integer, Long>();
+        }
+    }
+
+    /**
+     * Adds xid to rid mapping to the local DB
+     * 
+     * @param swid
+     *            The switch id
+     * @param xid
+     *            The OF message xid
+     * @param rid
+     *            The message Request ID
+     */
+    private void addXid2Rid(long swid, int xid, long rid) {
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        if (swxid2rid != null) {
+            swxid2rid.put(xid, rid);
+        }
+    }
+
+    /**
+     * When an Error message is received, this method will be invoked to remove
+     * the offending xid from the local DB.
+     * 
+     * @param swid
+     *            The switch id
+     * @param xid
+     *            The OF message xid
+     */
+    private void removeXid2Rid(long swid, int xid) {
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        if (swxid2rid != null) {
+            swxid2rid.remove(xid);
+        }
+    }
+
+    /**
+     * When a Barrier reply is received, this method will be invoked to clear
+     * the local DB
+     * 
+     * @param swid
+     *            The switch id
+     */
+    private void clearXid2Rid(long swid) {
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
+        if (swxid2rid != null) {
+            swxid2rid.clear();
+        }
+    }
+
+    @Override
+    public void updateNode(Node node, UpdateType type, Set<Property> props) {
+        long swid = (Long)node.getID();
+        
+        switch (type) {
+        case ADDED:
+            Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
+            this.xid2rid.put(swid, swxid2rid);
+            break;
+        case CHANGED:
+            break;
+        case REMOVED:
+            this.xid2rid.remove(swid);
+            break;
+        default:
+        }
+    }
+
+    @Override
+    public void updateNodeConnector(NodeConnector nodeConnector,
+            UpdateType type, Set<Property> props) {
+    }
+
+    private void registerWithOSGIConsole() {
+        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
+                .getBundleContext();
+        bundleContext.registerService(CommandProvider.class.getName(), this,
+                null);
+    }
+
+    @Override
+    public String getHelp() {
+        StringBuffer help = new StringBuffer();
+        help.append("-- Flow Programmer Service --\n");
+        help.append("\t px2r <node id>          - Print outstanding xid2rid mappings for a given node id\n");
+        help.append("\t px2rc                   - Print max num of async msgs prior to the Barrier\n");
+        return help.toString();
+    }
+
+    public void _px2r(CommandInterpreter ci) {
+        String st = ci.nextArgument();
+        if (st == null) {
+            ci.println("Please enter a valid node id");
+            return;
+        }
+        
+        long sid;
+        try {
+            sid = HexEncode.stringToLong(st);
+        } catch (NumberFormatException e) {
+            ci.println("Please enter a valid node id");
+            return;
+        }
+        
+        Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
+        if (swxid2rid == null) {
+            ci.println("The node id entered does not exist");
+            return;
+        }
+
+        ci.println("xid             rid");
+        
+        Set<Integer> xidSet = swxid2rid.keySet();
+        if (xidSet == null) {
+            return;
+        }
+
+        for (Integer xid : xidSet) {
+            ci.println(xid + "       " + swxid2rid.get(xid));
+        }
+    }
+
+    public void _px2rc(CommandInterpreter ci) {
+        ci.println("Max num of async messages sent prior to the Barrier message is "
+                + barrierMessagePriorCount);
+    }
 }