X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fstatisticsmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fstatisticsmanager%2Finternal%2FStatisticsManager.java;h=92ed44efbbde51e80046048f3d4afe9e1a4de56f;hb=6e9d5f469bf492778360a7400861e1d839737f87;hp=02b6251e9f7076073b576b398df18a641d690582;hpb=22ef1dee1ecf9d749f6b41b2bb09e915bfc76062;p=controller.git diff --git a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java index 02b6251e9f..92ed44efbb 100644 --- a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java +++ b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.statisticsmanager.internal; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -19,12 +20,20 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; @@ -49,22 +58,47 @@ import org.slf4j.LoggerFactory; * The class caches latest network nodes statistics as notified by reader * services and provides API to retrieve them. */ -public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates { +public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates, + ICacheUpdateAware { private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class); private IContainer container; private IClusterContainerServices clusterContainerService; private IReadService reader; + private IConnectionManager connectionManager; //statistics caches private ConcurrentMap> flowStatistics; private ConcurrentMap> nodeConnectorStatistics; private ConcurrentMap> tableStatistics; private ConcurrentMap descriptionStatistics; + // data structure for latches + // this is not a cluster cache + private ConcurrentMap latches = new ConcurrentHashMap(); + // 30 seconds is the timeout. + // the value of this can be tweaked based on performance tests. + private static long latchTimeout = 30; + + // cache for flow stats refresh triggers + // an entry added to this map triggers the statistics manager + // to which the node is connected to get the latest flow stats from that node + // this is a cluster cache + private ConcurrentMap triggers; + + // use an atomic integer for the triggers key + private AtomicInteger triggerKey = new AtomicInteger(); + + // single thread executor for the triggers + private ExecutorService triggerExecutor; + + static final String TRIGGERS_CACHE = "statisticsmanager.triggers"; + static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics"; + private void nonClusterObjectCreate() { flowStatistics = new ConcurrentHashMap>(); nodeConnectorStatistics = new ConcurrentHashMap>(); tableStatistics = new ConcurrentHashMap>(); descriptionStatistics = new ConcurrentHashMap(); + triggers = new ConcurrentHashMap(); } @SuppressWarnings("deprecation") @@ -76,7 +110,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } try { - clusterContainerService.createCache("statisticsmanager.flowStatistics", + clusterContainerService.createCache(FLOW_STATISTICS_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -84,7 +118,8 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.descriptionStatistics", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - + clusterContainerService.createCache(TRIGGERS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); } catch (CacheConfigException cce) { log.error("Statistics cache configuration invalid - check cache mode"); } catch (CacheExistException ce) { @@ -102,7 +137,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen log.debug("Statistics Manager - retrieveCaches for Container {}", container); - map = clusterContainerService.getCache("statisticsmanager.flowStatistics"); + map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE); if (map != null) { this.flowStatistics = (ConcurrentMap>) map; } else { @@ -129,6 +164,13 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } else { log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName()); } + + map = clusterContainerService.getCache(TRIGGERS_CACHE); + if (map != null) { + this.triggers = (ConcurrentMap) map; + } else { + log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName()); + } } /** @@ -161,6 +203,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void start() { log.debug("START called!"); + this.triggerExecutor = Executors.newSingleThreadExecutor(); } /** @@ -204,6 +247,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void stop() { log.debug("STOP called!"); + this.triggerExecutor.shutdownNow(); } void setClusterContainerService(IClusterContainerServices s) { @@ -239,7 +283,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public List getFlows(Node node) { if (node == null) { - return null; + return Collections.emptyList(); } List flowList = new ArrayList(); @@ -250,6 +294,62 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen return flowList; } + /** + * {@inheritDoc} + */ + @Override + public List getFlowsNoCache(Node node) { + if (node == null) { + return Collections.emptyList(); + } + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(node); + } + if (locality == ConnectionLocality.NOT_LOCAL) { + // send a trigger to all and wait for either a response or timeout + CountDownLatch newLatch = new CountDownLatch(1); + CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch); + this.triggers.put(this.triggerKey.incrementAndGet(), node); + try { + boolean retStatus; + if(oldLatch != null) { + retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } else { + retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } + // log the return code as it will give us, if + // the latch timed out. + log.debug("latch timed out {}", !retStatus); + } catch (InterruptedException e) { + // log the error and move on + log.warn("Waiting for statistics response interrupted", e); + // restore the interrupt status + // its a good practice to restore the interrupt status + // if you are not propagating the InterruptedException + Thread.currentThread().interrupt(); + } + // now that the wait is over + // remove the latch entry + this.latches.remove(node); + } else { + // the node is local. + // call the read service + if (this.reader != null) { + List flows = reader.nonCachedReadAllFlows(node); + if (flows != null) { + nodeFlowStatisticsUpdated(node, flows); + } + } + } + // at this point we are ready to return the cached value. + // this cached value will be up to date with a very high probability + // due to what we have done previously ie:- send a trigger for cache update + // or refreshed the cache if the node is local. + return getFlows(node); + } + @Override public Map> getFlowStatisticsForFlowList(List flowList) { Map> statMapOutput = new HashMap>(); @@ -414,4 +514,85 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { // Not interested in this update } + + public void unsetIConnectionManager(IConnectionManager s) { + if (s == this.connectionManager) { + this.connectionManager = null; + } + } + + public void setIConnectionManager(IConnectionManager s) { + this.connectionManager = s; + } + + @Override + public void entryCreated(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } + + @Override + public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + if (originLocal) { + /* + * Local updates are of no interest + */ + return; + } + if (cacheName.equals(TRIGGERS_CACHE)) { + log.trace("Got a trigger for key {} : value {}", key, new_value); + final Node n = (Node) new_value; + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.NOT_LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(n); + } + if (locality == ConnectionLocality.LOCAL) { + log.trace("trigger for node {} processes locally", n); + // delete the trigger and proceed with handling the trigger + this.triggers.remove(key); + // this is a potentially long running task + // off load it from the listener thread + Runnable r = new Runnable() { + @Override + public void run() { + // the node is local. + // call the read service + if (reader != null) { + List flows = reader.nonCachedReadAllFlows(n); + if (flows != null) { + flowStatistics.put(n, flows); + } + } + } + }; + // submit the runnable for execution + if(this.triggerExecutor != null) { + this.triggerExecutor.execute(r); + } + } + } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) { + // flow statistics cache updated + // get the node + log.trace("Got a flow statistics cache update for key {}", key); + // this is a short running task + // no need of off loading from the listener thread + final Node n = (Node) key; + // check if an outstanding trigger exists for this node + CountDownLatch l = this.latches.get(n); + if(l != null) { + // someone was waiting for this update + // let him know + l.countDown(); + } + } + } + + @Override + public void entryDeleted(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } }