Bug 73: Thread safety in FRM 06/2306/2
authorAlessandro Boch <aboch@cisco.com>
Thu, 31 Oct 2013 19:46:15 +0000 (12:46 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 4 Nov 2013 18:34:44 +0000 (18:34 +0000)
- Have nodeFlows and groupFlows non cluster caches
- Update the two above indirectly on every installedSwView cache update (local or remote)
- Have the update be performed by the FRM event thread only so that all the updates are streamlined

Change-Id: Ie5fa6ebee9433058d4028cff51ed1369ebd7b2a2
Signed-off-by: Alessandro Boch <aboch@cisco.com>
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java

index 98dc6865019ad725dec77758d2a3955fdff19a77..48651f7e60d7a7ed5be2a108c2c304e7ada5cb5b 100644 (file)
@@ -75,6 +75,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             Set<String> propSet = new HashSet<String>();
             propSet.add(ForwardingRulesManager.WORK_STATUS_CACHE);
             propSet.add(ForwardingRulesManager.WORK_ORDER_CACHE);
+            propSet.add(ForwardingRulesManager.INSTALLED_SW_VIEW_CACHE);
             props.put("cachenames", propSet);
 
             // export the service
index 520825762a40e8bfec9b89c5c0541c41edeedfc1..665ba7c635781968458dce7b026fc4bf6477d449 100644 (file)
@@ -102,8 +102,8 @@ public class ForwardingRulesManager implements
 
     private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
     private static final Logger logsync = LoggerFactory.getLogger("FRMsync");
-    private static final String PORTREMOVED = "Port removed";
-    private static final String NODEDOWN = "Node is Down";
+    private static final String PORT_REMOVED = "Port removed";
+    private static final String NODE_DOWN = "Node is Down";
     private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry";
     private String frmFileName;
     private String portGroupFileName;
@@ -160,6 +160,8 @@ public class ForwardingRulesManager implements
      */
     static final String WORK_ORDER_CACHE = "frm.workOrder";
     static final String WORK_STATUS_CACHE = "frm.workStatus";
+    static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView";
+    static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView";
 
     /*
      * Data structure responsible for distributing the FlowEntryInstall requests
@@ -603,8 +605,8 @@ public class ForwardingRulesManager implements
 
             // Update DB
             newEntries.setRequestId(status.getRequestId());
-            updateLocalDatabase(currentEntries, false);
-            updateLocalDatabase(newEntries, true);
+            updateSwViews(currentEntries, false);
+            updateSwViews(newEntries, true);
 
             return status;
         }
@@ -718,7 +720,7 @@ public class ForwardingRulesManager implements
             log.trace("Removed  {}", entry.getInstall());
 
             // Update DB
-            updateLocalDatabase(entry, false);
+            updateSwViews(entry, false);
 
             return status;
         }
@@ -770,7 +772,7 @@ public class ForwardingRulesManager implements
 
             // Update DB
             entry.setRequestId(status.getRequestId());
-            updateLocalDatabase(entry, true);
+            updateSwViews(entry, true);
 
             return status;
         }
@@ -815,10 +817,7 @@ public class ForwardingRulesManager implements
         return null;
     }
 
-    private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
-        // Update the software view
-        updateSwViewes(entry, add);
-
+    private void updateIndexDatabase(FlowEntryInstall entry, boolean add) {
         // Update node indexed flow database
         updateNodeFlowsDB(entry, add);
 
@@ -829,7 +828,7 @@ public class ForwardingRulesManager implements
     /*
      * Update the node mapped flows database
      */
-    private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) {
+    private void updateSwViews(FlowEntryInstall flowEntries, boolean add) {
         if (add) {
             originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal());
             installedSwView.put(flowEntries, flowEntries);
@@ -928,7 +927,7 @@ public class ForwardingRulesManager implements
 
         // Update DB
         if (status.isSuccess()) {
-            updateLocalDatabase(target, false);
+            updateSwViews(target, false);
         } else {
             // log the error
             log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
@@ -1171,8 +1170,6 @@ public class ForwardingRulesManager implements
     private void nonClusterObjectCreate() {
         originalSwView = new ConcurrentHashMap<FlowEntry, FlowEntry>();
         installedSwView = new ConcurrentHashMap<FlowEntryInstall, FlowEntryInstall>();
-        nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
-        groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
         TSPolicies = new ConcurrentHashMap<String, Object>();
         staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
         portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
@@ -1371,21 +1368,15 @@ public class ForwardingRulesManager implements
         log.debug("Allocating caches for Container {}", container.getName());
 
         try {
-            clusterContainerService.createCache("frm.originalSwView",
+            clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE,
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-            clusterContainerService.createCache("frm.installedSwView",
+            clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE,
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache("frm.inactiveFlows",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-            clusterContainerService.createCache("frm.nodeFlows",
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
-            clusterContainerService.createCache("frm.groupFlows",
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
             clusterContainerService.createCache("frm.staticFlows",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
@@ -1426,14 +1417,14 @@ public class ForwardingRulesManager implements
 
         log.debug("Retrieving Caches for Container {}", container.getName());
 
-        map = clusterContainerService.getCache("frm.originalSwView");
+        map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE);
         if (map != null) {
             originalSwView = (ConcurrentMap<FlowEntry, FlowEntry>) map;
         } else {
             log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName());
         }
 
-        map = clusterContainerService.getCache("frm.installedSwView");
+        map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE);
         if (map != null) {
             installedSwView = (ConcurrentMap<FlowEntryInstall, FlowEntryInstall>) map;
         } else {
@@ -1447,20 +1438,6 @@ public class ForwardingRulesManager implements
             log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName());
         }
 
-        map = clusterContainerService.getCache("frm.nodeFlows");
-        if (map != null) {
-            nodeFlows = (ConcurrentMap<Node, List<FlowEntryInstall>>) map;
-        } else {
-            log.error("Retrieval of cache failed for Container {}", container.getName());
-        }
-
-        map = clusterContainerService.getCache("frm.groupFlows");
-        if (map != null) {
-            groupFlows = (ConcurrentMap<String, List<FlowEntryInstall>>) map;
-        } else {
-            log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName());
-        }
-
         map = clusterContainerService.getCache("frm.staticFlows");
         if (map != null) {
             staticFlows = (ConcurrentMap<Integer, FlowConfig>) map;
@@ -1658,7 +1635,7 @@ public class ForwardingRulesManager implements
                     // Take note of this controller generated static flow
                     toRemove.add(entry.getKey());
                 } else {
-                    config.setStatus(NODEDOWN);
+                    config.setStatus(NODE_DOWN);
                 }
             }
         }
@@ -2169,7 +2146,7 @@ public class ForwardingRulesManager implements
             List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>(nodeFlows.get(node));
 
             for (FlowEntryInstall entry : toRemove) {
-                updateLocalDatabase(entry, false);
+                updateSwViews(entry, false);
             }
         }
     }
@@ -2293,7 +2270,7 @@ public class ForwardingRulesManager implements
                 if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
                     FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
                     if (flowConfig != null) {
-                        flowConfig.setStatus(PORTREMOVED);
+                        flowConfig.setStatus(PORT_REMOVED);
                         updated = true;
                     }
                 }
@@ -2488,6 +2465,9 @@ public class ForwardingRulesManager implements
             portGroupProvider.registerPortGroupChange(this);
         }
 
+        nodeFlows = new ConcurrentHashMap<Node, List<FlowEntryInstall>>();
+        groupFlows = new ConcurrentHashMap<String, List<FlowEntryInstall>>();
+
         cacheStartup();
 
         registerWithOSGIConsole();
@@ -2589,9 +2569,11 @@ public class ForwardingRulesManager implements
                              * flow merging is not an injective function
                              */
                             updateFlowsContainerFlow();
+                        } else if (event instanceof UpdateIndexDBs) {
+                            UpdateIndexDBs update = (UpdateIndexDBs)event;
+                            updateIndexDatabase(update.getFei(), update.isAddition());
                         } else {
-                            log.warn("Dequeued unknown event {}", event.getClass()
-                                    .getSimpleName());
+                            log.warn("Dequeued unknown event {}", event.getClass().getSimpleName());
                         }
                     } catch (InterruptedException e) {
                         // clear pending events
@@ -2878,6 +2860,38 @@ public class ForwardingRulesManager implements
         }
     }
 
+    private class UpdateIndexDBs extends FRMEvent {
+        private FlowEntryInstall fei;
+        private boolean add;
+
+        /**
+         *
+         * @param fei the flow entry which was installed/removed on the netwrok node
+         * @param update
+         */
+        UpdateIndexDBs(FlowEntryInstall fei, boolean add) {
+            this.fei = fei;
+            this.add = add;
+        }
+
+
+        /**
+         * @return the flowEntryInstall object which was added/removed
+         * to/from the installed software view cache
+         */
+        public FlowEntryInstall getFei() {
+            return fei;
+        }
+
+        /**
+         *
+         * @return whether this was an flow addition or removal
+         */
+        public boolean isAddition() {
+            return add;
+        }
+    }
+
     /*
      * OSGI COMMANDS
      */
@@ -3005,7 +3019,7 @@ public class ForwardingRulesManager implements
         }
 
         // Update software views
-        this.updateLocalDatabase(installedEntry, false);
+        this.updateSwViews(installedEntry, false);
     }
 
     @Override
@@ -3042,7 +3056,7 @@ public class ForwardingRulesManager implements
         }
         if (target != null) {
             // This was a flow install, update database
-            this.updateLocalDatabase(target, false);
+            this.updateSwViews(target, false);
             // also update the config
             if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
                 ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
@@ -3112,6 +3126,13 @@ public class ForwardingRulesManager implements
 
     @Override
     public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+        /*
+         * Streamline the updates for the per node and per group index databases
+         */
+        if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+            pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, true));
+        }
+
         if (originLocal) {
             /*
              * Local updates are of no interest
@@ -3163,8 +3184,11 @@ public class ForwardingRulesManager implements
     @Override
     public void entryDeleted(Object key, String cacheName, boolean originLocal) {
         /*
-         * Do nothing
+         * Streamline the updates for the per node and per group index databases
          */
+        if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) {
+            pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false));
+        }
     }
 
     /**