X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fstatisticsmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fstatisticsmanager%2Finternal%2FStatisticsManager.java;h=5cd47f2f20a48cbf75774055267d0f2f90f6b751;hp=f5c13b61053ee5eaaa7eab5594608cd3da4ce5d7;hb=4cff32bcf72cca1bbc6f6a44eb70d7c159df38a6;hpb=eed57e2b0afd50823bc882123b6cbac04bcc48d9 diff --git a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java index f5c13b6105..5cd47f2f20 100644 --- a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java +++ b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.statisticsmanager.internal; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -19,12 +20,20 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; @@ -49,25 +58,49 @@ import org.slf4j.LoggerFactory; * The class caches latest network nodes statistics as notified by reader * services and provides API to retrieve them. */ -public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates { +public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates, + ICacheUpdateAware { private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class); private IContainer container; private IClusterContainerServices clusterContainerService; private IReadService reader; + private IConnectionManager connectionManager; //statistics caches private ConcurrentMap> flowStatistics; private ConcurrentMap> nodeConnectorStatistics; private ConcurrentMap> tableStatistics; private ConcurrentMap descriptionStatistics; + // data structure for latches + // this is not a cluster cache + private ConcurrentMap latches = new ConcurrentHashMap(); + // 30 seconds is the timeout. + // the value of this can be tweaked based on performance tests. + private static long latchTimeout = 30; + + // cache for flow stats refresh triggers + // an entry added to this map triggers the statistics manager + // to which the node is connected to get the latest flow stats from that node + // this is a cluster cache + private ConcurrentMap triggers; + + // use an atomic integer for the triggers key + private AtomicInteger triggerKey = new AtomicInteger(); + + // single thread executor for the triggers + private ExecutorService triggerExecutor; + + static final String TRIGGERS_CACHE = "statisticsmanager.triggers"; + static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics"; + private void nonClusterObjectCreate() { flowStatistics = new ConcurrentHashMap>(); nodeConnectorStatistics = new ConcurrentHashMap>(); tableStatistics = new ConcurrentHashMap>(); descriptionStatistics = new ConcurrentHashMap(); + triggers = new ConcurrentHashMap(); } - @SuppressWarnings("deprecation") private void allocateCaches() { if (clusterContainerService == null) { nonClusterObjectCreate(); @@ -76,22 +109,23 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } try { - clusterContainerService.createCache("statisticsmanager.flowStatistics", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache(FLOW_STATISTICS_CACHE, + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.tableStatistics", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.descriptionStatistics", - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); - + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + clusterContainerService.createCache(TRIGGERS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); } catch (CacheConfigException cce) { log.error("Statistics cache configuration invalid - check cache mode"); } catch (CacheExistException ce) { log.debug("Skipping statistics cache creation - already present"); } } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { ConcurrentMap map; @@ -102,7 +136,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen log.debug("Statistics Manager - retrieveCaches for Container {}", container); - map = clusterContainerService.getCache("statisticsmanager.flowStatistics"); + map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE); if (map != null) { this.flowStatistics = (ConcurrentMap>) map; } else { @@ -129,6 +163,13 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } else { log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName()); } + + map = clusterContainerService.getCache(TRIGGERS_CACHE); + if (map != null) { + this.triggers = (ConcurrentMap) map; + } else { + log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName()); + } } /** @@ -161,26 +202,39 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void start() { log.debug("START called!"); + this.triggerExecutor = Executors.newSingleThreadExecutor(); } /** * Function called after registering the service in OSGi service registry. */ void started(){ - //retrieve current statistics so we don't have to wait for next refresh + // Retrieve current statistics so we don't have to wait for next refresh ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance( ISwitchManager.class, container.getName(), this); - if (reader != null && switchManager != null) { + if ((reader != null) && (switchManager != null)) { Set nodeSet = switchManager.getNodes(); for (Node node : nodeSet) { - flowStatistics.put(node, reader.readAllFlows(node)); - descriptionStatistics.put(node, reader.readDescription(node)); - tableStatistics.put(node, reader.readNodeTable(node)); - nodeConnectorStatistics.put(node, reader.readNodeConnectors(node)); + List flows = reader.readAllFlows(node); + if (flows != null) { + flowStatistics.put(node, flows); + } + NodeDescription descr = reader.readDescription(node); + if (descr != null) { + descriptionStatistics.put(node, descr); + } + List tableStats = reader.readNodeTable(node); + if (tableStats != null) { + tableStatistics.put(node, tableStats); + } + List ncStats = reader.readNodeConnectors(node); + if (ncStats != null) { + nodeConnectorStatistics.put(node, ncStats); + } } } else { - log.warn("Failed to retrieve current statistics. Statistics will not be immidiately available!"); + log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!"); } } @@ -192,6 +246,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void stop() { log.debug("STOP called!"); + this.triggerExecutor.shutdownNow(); } void setClusterContainerService(IClusterContainerServices s) { @@ -227,7 +282,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public List getFlows(Node node) { if (node == null) { - return null; + return Collections.emptyList(); } List flowList = new ArrayList(); @@ -238,6 +293,62 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen return flowList; } + /** + * {@inheritDoc} + */ + @Override + public List getFlowsNoCache(Node node) { + if (node == null) { + return Collections.emptyList(); + } + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(node); + } + if (locality == ConnectionLocality.NOT_LOCAL) { + // send a trigger to all and wait for either a response or timeout + CountDownLatch newLatch = new CountDownLatch(1); + CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch); + this.triggers.put(this.triggerKey.incrementAndGet(), node); + try { + boolean retStatus; + if(oldLatch != null) { + retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } else { + retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } + // log the return code as it will give us, if + // the latch timed out. + log.debug("latch timed out {}", !retStatus); + } catch (InterruptedException e) { + // log the error and move on + log.warn("Waiting for statistics response interrupted", e); + // restore the interrupt status + // its a good practice to restore the interrupt status + // if you are not propagating the InterruptedException + Thread.currentThread().interrupt(); + } + // now that the wait is over + // remove the latch entry + this.latches.remove(node); + } else { + // the node is local. + // call the read service + if (this.reader != null) { + List flows = reader.nonCachedReadAllFlows(node); + if (flows != null) { + nodeFlowStatisticsUpdated(node, flows); + } + } + } + // at this point we are ready to return the cached value. + // this cached value will be up to date with a very high probability + // due to what we have done previously ie:- send a trigger for cache update + // or refreshed the cache if the node is local. + return getFlows(node); + } + @Override public Map> getFlowStatisticsForFlowList(List flowList) { Map> statMapOutput = new HashMap>(); @@ -247,7 +358,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } Node node; - //index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry + // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry Map> index = new HashMap>(); for (FlowEntry flowEntry : flowList) { node = flowEntry.getNode(); @@ -256,7 +367,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen index.put(node, set); } - //iterate over flows per indexed node and add to output + // Iterate over flows per indexed node and add to output for (Entry> indexEntry : index.entrySet()) { node = indexEntry.getKey(); List flowsPerNode = flowStatistics.get(node); @@ -314,7 +425,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public List getNodeConnectorStatistics(Node node) { if (node == null){ - return null; + return Collections.emptyList(); } List statList = new ArrayList(); @@ -344,7 +455,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public List getNodeTableStatistics(Node node){ if (node == null){ - return null; + return Collections.emptyList(); } List statList = new ArrayList(); List cachedList = tableStatistics.get(node); @@ -356,27 +467,37 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public void nodeFlowStatisticsUpdated(Node node, List flowStatsList) { + // No equality check because duration fields change constantly this.flowStatistics.put(node, flowStatsList); } @Override public void nodeConnectorStatisticsUpdated(Node node, List ncStatsList) { - this.nodeConnectorStatistics.put(node, ncStatsList); + List currentStat = this.nodeConnectorStatistics.get(node); + if (! ncStatsList.equals(currentStat)){ + this.nodeConnectorStatistics.put(node, ncStatsList); + } } @Override public void nodeTableStatisticsUpdated(Node node, List tableStatsList) { - this.tableStatistics.put(node, tableStatsList); + List currentStat = this.tableStatistics.get(node); + if (! tableStatsList.equals(currentStat)) { + this.tableStatistics.put(node, tableStatsList); + } } @Override public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) { - this.descriptionStatistics.put(node, nodeDescription); + NodeDescription currentDesc = this.descriptionStatistics.get(node); + if (! nodeDescription.equals(currentDesc)){ + this.descriptionStatistics.put(node, nodeDescription); + } } @Override public void updateNode(Node node, UpdateType type, Set props) { - //if node is removed, remove stats mappings + // If node is removed, clean up stats mappings if (type == UpdateType.REMOVED) { flowStatistics.remove(node); nodeConnectorStatistics.remove(node); @@ -387,6 +508,87 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { - // not interested in this update + // Not interested in this update + } + + public void unsetIConnectionManager(IConnectionManager s) { + if (s == this.connectionManager) { + this.connectionManager = null; + } + } + + public void setIConnectionManager(IConnectionManager s) { + this.connectionManager = s; + } + + @Override + public void entryCreated(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } + + @Override + public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + if (originLocal) { + /* + * Local updates are of no interest + */ + return; + } + if (cacheName.equals(TRIGGERS_CACHE)) { + log.trace("Got a trigger for key {} : value {}", key, new_value); + final Node n = (Node) new_value; + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.NOT_LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(n); + } + if (locality == ConnectionLocality.LOCAL) { + log.trace("trigger for node {} processes locally", n); + // delete the trigger and proceed with handling the trigger + this.triggers.remove(key); + // this is a potentially long running task + // off load it from the listener thread + Runnable r = new Runnable() { + @Override + public void run() { + // the node is local. + // call the read service + if (reader != null) { + List flows = reader.nonCachedReadAllFlows(n); + if (flows != null) { + flowStatistics.put(n, flows); + } + } + } + }; + // submit the runnable for execution + if(this.triggerExecutor != null) { + this.triggerExecutor.execute(r); + } + } + } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) { + // flow statistics cache updated + // get the node + log.trace("Got a flow statistics cache update for key {}", key); + // this is a short running task + // no need of off loading from the listener thread + final Node n = (Node) key; + // check if an outstanding trigger exists for this node + CountDownLatch l = this.latches.get(n); + if(l != null) { + // someone was waiting for this update + // let him know + l.countDown(); + } + } + } + + @Override + public void entryDeleted(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ } }