Fix for cache cleanup in protocol plugin on container deletion
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / FlowProgrammerService.java
index 1d19c8b45c039e90e8e6bd859d03fe9312e712e3..88b75b196dcec8f370a6119130d21f3578aa89ab 100644 (file)
@@ -23,7 +23,9 @@ import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExtern
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
@@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory;
  */
 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         IMessageListener, IContainerListener, IInventoryShimExternalListener,
-        CommandProvider {
+        CommandProvider, IContainerAware {
     private static final Logger log = LoggerFactory
             .getLogger(FlowProgrammerService.class);
     private IController controller;
@@ -65,6 +67,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     private Map<String, Set<NodeConnector>> containerToNc;
     private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
     private int barrierMessagePriorCount = getBarrierMessagePriorCount();
+    private IPluginOutConnectionService connectionOutService;
 
     public FlowProgrammerService() {
         controller = null;
@@ -83,6 +86,16 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     public void setFlowProgrammerNotifier(Map<String, ?> props,
             IFlowProgrammerNotifier s) {
         if (props == null || props.get("containerName") == null) {
@@ -110,7 +123,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
-     * 
+     *
      */
     void init() {
         this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
@@ -122,7 +135,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
      * example bundle is being stopped.
-     * 
+     *
      */
     void destroy() {
     }
@@ -130,7 +143,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     /**
      * Function called by dependency manager after "init ()" is called and after
      * the services provided by the class are registered in the service registry
-     * 
+     *
      */
     void start() {
     }
@@ -139,39 +152,69 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
      * Function called by the dependency manager before the services exported by
      * the component are unregistered, this will be followed by a "destroy ()"
      * calls
-     * 
+     *
      */
     void stop() {
     }
 
     @Override
     public Status addFlow(Node node, Flow flow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Add flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return addFlowInternal(node, flow, 0);
     }
 
     @Override
     public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Modify flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return modifyFlowInternal(node, oldFlow, newFlow, 0);
     }
 
     @Override
     public Status removeFlow(Node node, Flow flow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return removeFlowInternal(node, flow, 0);
     }
 
     @Override
     public Status addFlowAsync(Node node, Flow flow, long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return addFlowInternal(node, flow, rid);
     }
 
     @Override
     public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
             long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return modifyFlowInternal(node, oldFlow, newFlow, rid);
     }
 
     @Override
     public Status removeFlowAsync(Node node, Flow flow, long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return removeFlowInternal(node, flow, rid);
     }
 
@@ -201,21 +244,9 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                      * will be inserted automatically to synchronize the
                      * progression.
                      */
-                    result = asyncMsgSend(node, sw, msg, rid);  
-                }
-                if (result instanceof Boolean) {
-                    return ((Boolean) result == Boolean.TRUE) ? new Status(
-                            StatusCode.SUCCESS, rid) : new Status(
-                            StatusCode.TIMEOUT, errorString(null, action,
-                                    "Request Timed Out"));
-                } else if (result instanceof OFError) {
-                    OFError res = (OFError) result;
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "program", action, Utils.getOFErrorString(res)));
-                } else {
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "send", action, "Internal Error"));
+                    result = asyncMsgSend(node, sw, msg, rid);
                 }
+                return getStatusInternal(result, action, rid);
             } else {
                 return new Status(StatusCode.GONE, errorString("send", action,
                         "Switch is not available"));
@@ -267,52 +298,28 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                      */
                     result = asyncMsgSend(node, sw, msg1, rid);
                 }
-                if (result instanceof Boolean) {
-                    if ((Boolean) result == Boolean.FALSE) {
-                        return new Status(StatusCode.TIMEOUT, errorString(null,
-                                action, "Request Timed Out"));
-                    } else if (msg2 == null) {
-                        return new Status(StatusCode.SUCCESS, rid);
-                    }
-                } else if (result instanceof OFError) {
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "program", action,
-                            Utils.getOFErrorString((OFError) result)));
-                } else {
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "send", action, "Internal Error"));
+
+                Status rv = getStatusInternal(result, action, rid);
+                if ((msg2 == null) || !rv.isSuccess()) {
+                    return rv;
                 }
 
-                if (msg2 != null) {
-                    action = "add";
-                    if (rid == 0) {
-                        /*
-                         * Synchronous message send. Each message is followed by a
-                         * Barrier message.
-                         */
-                        result = sw.syncSend(msg2);
-                    } else {
-                        /*
-                         * Message will be sent asynchronously. A Barrier message
-                         * will be inserted automatically to synchronize the
-                         * progression.
-                         */
-                        result = asyncMsgSend(node, sw, msg2, rid);
-                    }
-                    if (result instanceof Boolean) {
-                        return ((Boolean) result == Boolean.TRUE) ? new Status(
-                                StatusCode.SUCCESS, rid) : new Status(
-                                StatusCode.TIMEOUT, errorString(null, action,
-                                        "Request Timed Out"));
-                    } else if (result instanceof OFError) {
-                        return new Status(StatusCode.INTERNALERROR,
-                                errorString("program", action, Utils
-                                        .getOFErrorString((OFError) result)));
-                    } else {
-                        return new Status(StatusCode.INTERNALERROR,
-                                errorString("send", action, "Internal Error"));
-                    }
+                action = "add";
+                if (rid == 0) {
+                    /*
+                     * Synchronous message send. Each message is followed by a
+                     * Barrier message.
+                     */
+                    result = sw.syncSend(msg2);
+                } else {
+                    /*
+                     * Message will be sent asynchronously. A Barrier message
+                     * will be inserted automatically to synchronize the
+                     * progression.
+                     */
+                    result = asyncMsgSend(node, sw, msg2, rid);
                 }
+                return getStatusInternal(result, action, rid);
             } else {
                 return new Status(StatusCode.GONE, errorString("send", action,
                         "Switch is not available"));
@@ -348,19 +355,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
                      */
                     result = asyncMsgSend(node, sw, msg, rid);
                 }
-                if (result instanceof Boolean) {
-                    return ((Boolean) result == Boolean.TRUE) ? new Status(
-                            StatusCode.SUCCESS, rid) : new Status(
-                            StatusCode.TIMEOUT, errorString(null, action,
-                                    "Request Timed Out"));
-                } else if (result instanceof OFError) {
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "program", action,
-                            Utils.getOFErrorString((OFError) result)));
-                } else {
-                    return new Status(StatusCode.INTERNALERROR, errorString(
-                            "send", action, "Internal Error"));
-                }
+                return getStatusInternal(result, action, rid);
             } else {
                 return new Status(StatusCode.GONE, errorString("send", action,
                         "Switch is not available"));
@@ -372,7 +367,12 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status removeAllFlows(Node node) {
-        return new Status(StatusCode.SUCCESS, null);
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
+        return new Status(StatusCode.SUCCESS);
     }
 
     private String errorString(String phase, String action, String cause) {
@@ -440,7 +440,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         if ((rid == null) || (rid == 0)) {
             return;
         }
-        
+
         /*
          * Notifies the caller that error has been reported for a previous flow
          * programming request
@@ -466,8 +466,6 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     @Override
     public void nodeConnectorUpdated(String containerName, NodeConnector p,
             UpdateType type) {
-        Set<NodeConnector> target = null;
-
         switch (type) {
         case ADDED:
             if (!containerToNc.containsKey(containerName)) {
@@ -478,7 +476,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         case CHANGED:
             break;
         case REMOVED:
-            target = containerToNc.get(containerName);
+            Set<NodeConnector> target = containerToNc.get(containerName);
             if (target != null) {
                 target.remove(p);
             }
@@ -493,7 +491,12 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     }
 
     @Override
-    public Status sendBarrierMessage(Node node) {
+    public Status syncSendBarrierMessage(Node node) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE,
                     "The node does not support Barrier message.");
@@ -503,9 +506,9 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             long swid = (Long) node.getID();
             ISwitch sw = controller.getSwitch(swid);
             if (sw != null) {
-                sw.sendBarrierMessage();
+                sw.syncSendBarrierMessage();
                 clearXid2Rid(swid);
-                return (new Status(StatusCode.SUCCESS, null));
+                return (new Status(StatusCode.SUCCESS));
             } else {
                 return new Status(StatusCode.GONE,
                         "The node does not have a valid Switch reference.");
@@ -514,14 +517,42 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         return new Status(StatusCode.INTERNALERROR,
                 "Failed to send Barrier message.");
     }
-    
+
+    @Override
+    public Status asyncSendBarrierMessage(Node node) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
+        if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+            return new Status(StatusCode.NOTACCEPTABLE,
+                    "The node does not support Barrier message.");
+        }
+
+        if (controller != null) {
+            long swid = (Long) node.getID();
+            ISwitch sw = controller.getSwitch(swid);
+            if (sw != null) {
+                sw.asyncSendBarrierMessage();
+                clearXid2Rid(swid);
+                return (new Status(StatusCode.SUCCESS));
+            } else {
+                return new Status(StatusCode.GONE,
+                        "The node does not have a valid Switch reference.");
+            }
+        }
+        return new Status(StatusCode.INTERNALERROR,
+                "Failed to send Barrier message.");
+    }
+
     /**
      * This method sends the message asynchronously until the number of messages
      * sent reaches a threshold. Then a Barrier message is sent automatically
      * for sync purpose. An unique Request ID associated with the message is
      * passed down by the caller. The Request ID will be returned to the caller
      * when an error message is received from the switch.
-     * 
+     *
      * @param node
      *            The node
      * @param msg
@@ -539,25 +570,25 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
         xid = sw.asyncSend(msg);
         addXid2Rid(swid, xid, rid);
-        
+
         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
         if (swxid2rid == null) {
             return result;
         }
-        
+
         int size = swxid2rid.size();
         if (size % barrierMessagePriorCount == 0) {
-            result = sendBarrierMessage(node);
+            result = asyncSendBarrierMessage(node);
         }
-        
+
         return result;
     }
-    
+
     /**
      * A number of async messages are sent followed by a synchronous Barrier
      * message. This method returns the maximum async messages that can be sent
      * before the Barrier message.
-     * 
+     *
      * @return The max count of async messages sent prior to Barrier message
      */
     private int getBarrierMessagePriorCount() {
@@ -573,11 +604,11 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
         return rv;
     }
-    
+
     /**
      * This method returns the message Request ID previously assigned by the
      * caller for a given OF message xid
-     * 
+     *
      * @param swid
      *            The switch id
      * @param xid
@@ -601,14 +632,14 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     /**
      * This method returns a copy of outstanding xid to rid mappings.for a given
      * switch
-     * 
+     *
      * @param swid
      *            The switch id
      * @return a copy of xid2rid mappings
      */
     public Map<Integer, Long> getSwXid2Rid(long swid) {
         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
-        
+
         if (swxid2rid != null) {
             return new HashMap<Integer, Long>(swxid2rid);
         } else {
@@ -618,7 +649,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     /**
      * Adds xid to rid mapping to the local DB
-     * 
+     *
      * @param swid
      *            The switch id
      * @param xid
@@ -636,7 +667,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     /**
      * When an Error message is received, this method will be invoked to remove
      * the offending xid from the local DB.
-     * 
+     *
      * @param swid
      *            The switch id
      * @param xid
@@ -649,10 +680,39 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         }
     }
 
+    /**
+     * Convert various result into Status
+     *
+     * @param result
+     *            The returned result from previous action
+     * @param action
+     *            add/modify/delete flow action
+     * @param rid
+     *            The Request ID associated with the flow message
+     * @return Status
+     */
+    private Status getStatusInternal(Object result, String action, long rid) {
+        if (result instanceof Boolean) {
+            return ((Boolean) result == Boolean.TRUE) ? new Status(
+                    StatusCode.SUCCESS, rid) : new Status(
+                    StatusCode.TIMEOUT, errorString(null, action,
+                            "Request Timed Out"));
+        } else if (result instanceof Status) {
+            return (Status) result;
+        } else if (result instanceof OFError) {
+            OFError res = (OFError) result;
+            return new Status(StatusCode.INTERNALERROR, errorString(
+                    "program", action, Utils.getOFErrorString(res)));
+        } else {
+            return new Status(StatusCode.INTERNALERROR, errorString(
+                    "send", action, "Internal Error"));
+        }
+    }
+
     /**
      * When a Barrier reply is received, this method will be invoked to clear
      * the local DB
-     * 
+     *
      * @param swid
      *            The switch id
      */
@@ -666,7 +726,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     @Override
     public void updateNode(Node node, UpdateType type, Set<Property> props) {
         long swid = (Long)node.getID();
-        
+
         switch (type) {
         case ADDED:
             Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
@@ -708,7 +768,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             ci.println("Please enter a valid node id");
             return;
         }
-        
+
         long sid;
         try {
             sid = HexEncode.stringToLong(st);
@@ -716,7 +776,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             ci.println("Please enter a valid node id");
             return;
         }
-        
+
         Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
         if (swxid2rid == null) {
             ci.println("The node id entered does not exist");
@@ -724,7 +784,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         }
 
         ci.println("xid             rid");
-        
+
         Set<Integer> xidSet = swxid2rid.keySet();
         if (xidSet == null) {
             return;
@@ -739,4 +799,14 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         ci.println("Max num of async messages sent prior to the Barrier message is "
                 + barrierMessagePriorCount);
     }
+
+    @Override
+    public void containerCreate(String containerName) {
+        // do nothing
+    }
+
+    @Override
+    public void containerDestroy(String containerName) {
+        containerToNc.remove(containerName);
+    }
 }