From: Alessandro Boch Date: Thu, 31 Oct 2013 19:46:15 +0000 (-0700) Subject: Bug 73: Thread safety in FRM X-Git-Tag: jenkins-controller-bulk-release-prepare-only-2-1~512 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F06%2F2306%2F2;hp=1154766753880dc4db6472c2b1c91555d4d0da99;p=controller.git Bug 73: Thread safety in FRM - Have nodeFlows and groupFlows non cluster caches - Update the two above indirectly on every installedSwView cache update (local or remote) - Have the update be performed by the FRM event thread only so that all the updates are streamlined Change-Id: Ie5fa6ebee9433058d4028cff51ed1369ebd7b2a2 Signed-off-by: Alessandro Boch --- diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java index 98dc686501..48651f7e60 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/Activator.java @@ -75,6 +75,7 @@ public class Activator extends ComponentActivatorAbstractBase { Set propSet = new HashSet(); propSet.add(ForwardingRulesManager.WORK_STATUS_CACHE); propSet.add(ForwardingRulesManager.WORK_ORDER_CACHE); + propSet.add(ForwardingRulesManager.INSTALLED_SW_VIEW_CACHE); props.put("cachenames", propSet); // export the service diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index 520825762a..665ba7c635 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -102,8 +102,8 @@ public class ForwardingRulesManager implements private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class); private static final Logger logsync = LoggerFactory.getLogger("FRMsync"); - private static final String PORTREMOVED = "Port removed"; - private static final String NODEDOWN = "Node is Down"; + private static final String PORT_REMOVED = "Port removed"; + private static final String NODE_DOWN = "Node is Down"; private static final String INVALID_FLOW_ENTRY = "Invalid FlowEntry"; private String frmFileName; private String portGroupFileName; @@ -160,6 +160,8 @@ public class ForwardingRulesManager implements */ static final String WORK_ORDER_CACHE = "frm.workOrder"; static final String WORK_STATUS_CACHE = "frm.workStatus"; + static final String ORIGINAL_SW_VIEW_CACHE = "frm.originalSwView"; + static final String INSTALLED_SW_VIEW_CACHE = "frm.installedSwView"; /* * Data structure responsible for distributing the FlowEntryInstall requests @@ -603,8 +605,8 @@ public class ForwardingRulesManager implements // Update DB newEntries.setRequestId(status.getRequestId()); - updateLocalDatabase(currentEntries, false); - updateLocalDatabase(newEntries, true); + updateSwViews(currentEntries, false); + updateSwViews(newEntries, true); return status; } @@ -718,7 +720,7 @@ public class ForwardingRulesManager implements log.trace("Removed {}", entry.getInstall()); // Update DB - updateLocalDatabase(entry, false); + updateSwViews(entry, false); return status; } @@ -770,7 +772,7 @@ public class ForwardingRulesManager implements // Update DB entry.setRequestId(status.getRequestId()); - updateLocalDatabase(entry, true); + updateSwViews(entry, true); return status; } @@ -815,10 +817,7 @@ public class ForwardingRulesManager implements return null; } - private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { - // Update the software view - updateSwViewes(entry, add); - + private void updateIndexDatabase(FlowEntryInstall entry, boolean add) { // Update node indexed flow database updateNodeFlowsDB(entry, add); @@ -829,7 +828,7 @@ public class ForwardingRulesManager implements /* * Update the node mapped flows database */ - private void updateSwViewes(FlowEntryInstall flowEntries, boolean add) { + private void updateSwViews(FlowEntryInstall flowEntries, boolean add) { if (add) { originalSwView.put(flowEntries.getOriginal(), flowEntries.getOriginal()); installedSwView.put(flowEntries, flowEntries); @@ -928,7 +927,7 @@ public class ForwardingRulesManager implements // Update DB if (status.isSuccess()) { - updateLocalDatabase(target, false); + updateSwViews(target, false); } else { // log the error log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(), @@ -1171,8 +1170,6 @@ public class ForwardingRulesManager implements private void nonClusterObjectCreate() { originalSwView = new ConcurrentHashMap(); installedSwView = new ConcurrentHashMap(); - nodeFlows = new ConcurrentHashMap>(); - groupFlows = new ConcurrentHashMap>(); TSPolicies = new ConcurrentHashMap(); staticFlowsOrdinal = new ConcurrentHashMap(); portGroupConfigs = new ConcurrentHashMap(); @@ -1371,21 +1368,15 @@ public class ForwardingRulesManager implements log.debug("Allocating caches for Container {}", container.getName()); try { - clusterContainerService.createCache("frm.originalSwView", + clusterContainerService.createCache(ORIGINAL_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.installedSwView", + clusterContainerService.createCache(INSTALLED_SW_VIEW_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("frm.inactiveFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.nodeFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - - clusterContainerService.createCache("frm.groupFlows", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -1426,14 +1417,14 @@ public class ForwardingRulesManager implements log.debug("Retrieving Caches for Container {}", container.getName()); - map = clusterContainerService.getCache("frm.originalSwView"); + map = clusterContainerService.getCache(ORIGINAL_SW_VIEW_CACHE); if (map != null) { originalSwView = (ConcurrentMap) map; } else { log.error("Retrieval of frm.originalSwView cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.installedSwView"); + map = clusterContainerService.getCache(INSTALLED_SW_VIEW_CACHE); if (map != null) { installedSwView = (ConcurrentMap) map; } else { @@ -1447,20 +1438,6 @@ public class ForwardingRulesManager implements log.error("Retrieval of frm.inactiveFlows cache failed for Container {}", container.getName()); } - map = clusterContainerService.getCache("frm.nodeFlows"); - if (map != null) { - nodeFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of cache failed for Container {}", container.getName()); - } - - map = clusterContainerService.getCache("frm.groupFlows"); - if (map != null) { - groupFlows = (ConcurrentMap>) map; - } else { - log.error("Retrieval of frm.groupFlows cache failed for Container {}", container.getName()); - } - map = clusterContainerService.getCache("frm.staticFlows"); if (map != null) { staticFlows = (ConcurrentMap) map; @@ -1658,7 +1635,7 @@ public class ForwardingRulesManager implements // Take note of this controller generated static flow toRemove.add(entry.getKey()); } else { - config.setStatus(NODEDOWN); + config.setStatus(NODE_DOWN); } } } @@ -2169,7 +2146,7 @@ public class ForwardingRulesManager implements List toRemove = new ArrayList(nodeFlows.get(node)); for (FlowEntryInstall entry : toRemove) { - updateLocalDatabase(entry, false); + updateSwViews(entry, false); } } } @@ -2293,7 +2270,7 @@ public class ForwardingRulesManager implements if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); if (flowConfig != null) { - flowConfig.setStatus(PORTREMOVED); + flowConfig.setStatus(PORT_REMOVED); updated = true; } } @@ -2488,6 +2465,9 @@ public class ForwardingRulesManager implements portGroupProvider.registerPortGroupChange(this); } + nodeFlows = new ConcurrentHashMap>(); + groupFlows = new ConcurrentHashMap>(); + cacheStartup(); registerWithOSGIConsole(); @@ -2589,9 +2569,11 @@ public class ForwardingRulesManager implements * flow merging is not an injective function */ updateFlowsContainerFlow(); + } else if (event instanceof UpdateIndexDBs) { + UpdateIndexDBs update = (UpdateIndexDBs)event; + updateIndexDatabase(update.getFei(), update.isAddition()); } else { - log.warn("Dequeued unknown event {}", event.getClass() - .getSimpleName()); + log.warn("Dequeued unknown event {}", event.getClass().getSimpleName()); } } catch (InterruptedException e) { // clear pending events @@ -2878,6 +2860,38 @@ public class ForwardingRulesManager implements } } + private class UpdateIndexDBs extends FRMEvent { + private FlowEntryInstall fei; + private boolean add; + + /** + * + * @param fei the flow entry which was installed/removed on the netwrok node + * @param update + */ + UpdateIndexDBs(FlowEntryInstall fei, boolean add) { + this.fei = fei; + this.add = add; + } + + + /** + * @return the flowEntryInstall object which was added/removed + * to/from the installed software view cache + */ + public FlowEntryInstall getFei() { + return fei; + } + + /** + * + * @return whether this was an flow addition or removal + */ + public boolean isAddition() { + return add; + } + } + /* * OSGI COMMANDS */ @@ -3005,7 +3019,7 @@ public class ForwardingRulesManager implements } // Update software views - this.updateLocalDatabase(installedEntry, false); + this.updateSwViews(installedEntry, false); } @Override @@ -3042,7 +3056,7 @@ public class ForwardingRulesManager implements } if (target != null) { // This was a flow install, update database - this.updateLocalDatabase(target, false); + this.updateSwViews(target, false); // also update the config if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) { ConcurrentMap.Entry staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode()); @@ -3112,6 +3126,13 @@ public class ForwardingRulesManager implements @Override public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + /* + * Streamline the updates for the per node and per group index databases + */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, true)); + } + if (originLocal) { /* * Local updates are of no interest @@ -3163,8 +3184,11 @@ public class ForwardingRulesManager implements @Override public void entryDeleted(Object key, String cacheName, boolean originLocal) { /* - * Do nothing + * Streamline the updates for the per node and per group index databases */ + if (cacheName.equals(INSTALLED_SW_VIEW_CACHE)) { + pendingEvents.offer(new UpdateIndexDBs((FlowEntryInstall)key, false)); + } } /**