From: Asad Ahmed Date: Wed, 23 Oct 2013 23:57:41 +0000 (-0700) Subject: Added method for getting non-cached flow statistics X-Git-Tag: jenkins-controller-bulk-release-prepare-only-2-1~579 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=6e9d5f469bf492778360a7400861e1d839737f87;ds=sidebyside Added method for getting non-cached flow statistics Change-Id: I55f5846e35caca50dd638113fe77882b7213d221 Signed-off-by: Asad Ahmed --- diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 11373d9683..525d685bee 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -95,6 +95,7 @@ jacoco java 0.5.0-SNAPSHOT + 0.5.0-SNAPSHOT diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index e7a612d41b..5927d26bd7 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -411,7 +411,7 @@ org.opendaylight.controller statisticsmanager - ${controller.version} + ${statisticsmanager.version} org.opendaylight.controller diff --git a/opendaylight/northbound/integrationtest/pom.xml b/opendaylight/northbound/integrationtest/pom.xml index 04c152ad59..7bb06c461d 100644 --- a/opendaylight/northbound/integrationtest/pom.xml +++ b/opendaylight/northbound/integrationtest/pom.xml @@ -143,7 +143,7 @@ org.opendaylight.controller statisticsmanager - 0.4.1-SNAPSHOT + 0.5.0-SNAPSHOT org.opendaylight.controller diff --git a/opendaylight/statisticsmanager/api/pom.xml b/opendaylight/statisticsmanager/api/pom.xml index 9b775eaa92..0dca7396e7 100644 --- a/opendaylight/statisticsmanager/api/pom.xml +++ b/opendaylight/statisticsmanager/api/pom.xml @@ -15,7 +15,7 @@ statisticsmanager - 0.4.1-SNAPSHOT + 0.5.0-SNAPSHOT bundle diff --git a/opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java b/opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java index 8267e4d029..c717ccf208 100644 --- a/opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java +++ b/opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java @@ -39,6 +39,17 @@ public interface IStatisticsManager { */ List getFlows(Node node); + /** + * Same as the getFlows method. + * The only difference is that this method does not return cached flows. + * It will always make a request to the node to get all the flows for that node. + * If the request times out or gets an error, we revert to getting the cached flows. + * @see IStatisticsManager#getFlows + * @param node + * @return List of flows installed on the network node. + */ + List getFlowsNoCache(Node node); + /** * Returns the statistics for the flows specified in the list * diff --git a/opendaylight/statisticsmanager/implementation/pom.xml b/opendaylight/statisticsmanager/implementation/pom.xml index 5d82c5c5db..7281b9a71f 100644 --- a/opendaylight/statisticsmanager/implementation/pom.xml +++ b/opendaylight/statisticsmanager/implementation/pom.xml @@ -51,9 +51,11 @@ org.slf4j, org.opendaylight.controller.sal.inventory, org.opendaylight.controller.sal.match, + org.opendaylight.controller.sal.connection, org.opendaylight.controller.switchmanager, org.opendaylight.controller.statisticsmanager, org.opendaylight.controller.forwardingrulesmanager, + org.opendaylight.controller.connectionmanager, org.apache.felix.dm @@ -93,7 +95,7 @@ org.opendaylight.controller statisticsmanager - 0.4.1-SNAPSHOT + 0.5.0-SNAPSHOT org.opendaylight.controller @@ -115,6 +117,11 @@ clustering.services 0.4.1-SNAPSHOT + + org.opendaylight.controller + connectionmanager + 0.1.1-SNAPSHOT + junit junit diff --git a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java index 5afaaa6916..7eb32b05f7 100644 --- a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java +++ b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java @@ -9,8 +9,15 @@ package org.opendaylight.controller.statisticsmanager.internal; +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Set; + import org.apache.felix.dm.Component; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; +import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates; @@ -56,10 +63,20 @@ public class Activator extends ComponentActivatorAbstractBase { public void configureInstance(Component c, Object imp, String containerName) { if (imp.equals(StatisticsManager.class)) { // export the service - c.setInterface(new String[] { + Dictionary props = new Hashtable(); + Set propSet = new HashSet(); + // trigger cache + propSet.add(StatisticsManager.TRIGGERS_CACHE); + // flow statistics cache + propSet.add(StatisticsManager.FLOW_STATISTICS_CACHE); + props.put("cachenames", propSet); + + String interfaces[] = new String[] { IStatisticsManager.class.getName(), IReadServiceListener.class.getName(), - IListenInventoryUpdates.class.getName() }, null); + IListenInventoryUpdates.class.getName(), + ICacheUpdateAware.class.getName() }; + c.setInterface(interfaces, props); c.add(createContainerServiceDependency(containerName).setService(IReadService.class) .setCallbacks("setReaderService", "unsetReaderService").setRequired(true)); @@ -67,6 +84,8 @@ public class Activator extends ComponentActivatorAbstractBase { .setCallbacks("setClusterContainerService", "unsetClusterContainerService").setRequired(true)); c.add(createContainerServiceDependency(containerName).setService(IContainer.class) .setCallbacks("setIContainer", "unsetIContainer").setRequired(true)); + c.add(createServiceDependency().setService(IConnectionManager.class) + .setCallbacks("setIConnectionManager", "unsetIConnectionManager").setRequired(false)); } } diff --git a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java index 02b6251e9f..92ed44efbb 100644 --- a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java +++ b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.statisticsmanager.internal; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -19,12 +20,20 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; @@ -49,22 +58,47 @@ import org.slf4j.LoggerFactory; * The class caches latest network nodes statistics as notified by reader * services and provides API to retrieve them. */ -public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates { +public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates, + ICacheUpdateAware { private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class); private IContainer container; private IClusterContainerServices clusterContainerService; private IReadService reader; + private IConnectionManager connectionManager; //statistics caches private ConcurrentMap> flowStatistics; private ConcurrentMap> nodeConnectorStatistics; private ConcurrentMap> tableStatistics; private ConcurrentMap descriptionStatistics; + // data structure for latches + // this is not a cluster cache + private ConcurrentMap latches = new ConcurrentHashMap(); + // 30 seconds is the timeout. + // the value of this can be tweaked based on performance tests. + private static long latchTimeout = 30; + + // cache for flow stats refresh triggers + // an entry added to this map triggers the statistics manager + // to which the node is connected to get the latest flow stats from that node + // this is a cluster cache + private ConcurrentMap triggers; + + // use an atomic integer for the triggers key + private AtomicInteger triggerKey = new AtomicInteger(); + + // single thread executor for the triggers + private ExecutorService triggerExecutor; + + static final String TRIGGERS_CACHE = "statisticsmanager.triggers"; + static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics"; + private void nonClusterObjectCreate() { flowStatistics = new ConcurrentHashMap>(); nodeConnectorStatistics = new ConcurrentHashMap>(); tableStatistics = new ConcurrentHashMap>(); descriptionStatistics = new ConcurrentHashMap(); + triggers = new ConcurrentHashMap(); } @SuppressWarnings("deprecation") @@ -76,7 +110,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } try { - clusterContainerService.createCache("statisticsmanager.flowStatistics", + clusterContainerService.createCache(FLOW_STATISTICS_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -84,7 +118,8 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache("statisticsmanager.descriptionStatistics", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - + clusterContainerService.createCache(TRIGGERS_CACHE, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC)); } catch (CacheConfigException cce) { log.error("Statistics cache configuration invalid - check cache mode"); } catch (CacheExistException ce) { @@ -102,7 +137,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen log.debug("Statistics Manager - retrieveCaches for Container {}", container); - map = clusterContainerService.getCache("statisticsmanager.flowStatistics"); + map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE); if (map != null) { this.flowStatistics = (ConcurrentMap>) map; } else { @@ -129,6 +164,13 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen } else { log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName()); } + + map = clusterContainerService.getCache(TRIGGERS_CACHE); + if (map != null) { + this.triggers = (ConcurrentMap) map; + } else { + log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName()); + } } /** @@ -161,6 +203,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void start() { log.debug("START called!"); + this.triggerExecutor = Executors.newSingleThreadExecutor(); } /** @@ -204,6 +247,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen */ void stop() { log.debug("STOP called!"); + this.triggerExecutor.shutdownNow(); } void setClusterContainerService(IClusterContainerServices s) { @@ -239,7 +283,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen @Override public List getFlows(Node node) { if (node == null) { - return null; + return Collections.emptyList(); } List flowList = new ArrayList(); @@ -250,6 +294,62 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen return flowList; } + /** + * {@inheritDoc} + */ + @Override + public List getFlowsNoCache(Node node) { + if (node == null) { + return Collections.emptyList(); + } + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(node); + } + if (locality == ConnectionLocality.NOT_LOCAL) { + // send a trigger to all and wait for either a response or timeout + CountDownLatch newLatch = new CountDownLatch(1); + CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch); + this.triggers.put(this.triggerKey.incrementAndGet(), node); + try { + boolean retStatus; + if(oldLatch != null) { + retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } else { + retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS); + } + // log the return code as it will give us, if + // the latch timed out. + log.debug("latch timed out {}", !retStatus); + } catch (InterruptedException e) { + // log the error and move on + log.warn("Waiting for statistics response interrupted", e); + // restore the interrupt status + // its a good practice to restore the interrupt status + // if you are not propagating the InterruptedException + Thread.currentThread().interrupt(); + } + // now that the wait is over + // remove the latch entry + this.latches.remove(node); + } else { + // the node is local. + // call the read service + if (this.reader != null) { + List flows = reader.nonCachedReadAllFlows(node); + if (flows != null) { + nodeFlowStatisticsUpdated(node, flows); + } + } + } + // at this point we are ready to return the cached value. + // this cached value will be up to date with a very high probability + // due to what we have done previously ie:- send a trigger for cache update + // or refreshed the cache if the node is local. + return getFlows(node); + } + @Override public Map> getFlowStatisticsForFlowList(List flowList) { Map> statMapOutput = new HashMap>(); @@ -414,4 +514,85 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { // Not interested in this update } + + public void unsetIConnectionManager(IConnectionManager s) { + if (s == this.connectionManager) { + this.connectionManager = null; + } + } + + public void setIConnectionManager(IConnectionManager s) { + this.connectionManager = s; + } + + @Override + public void entryCreated(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } + + @Override + public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) { + if (originLocal) { + /* + * Local updates are of no interest + */ + return; + } + if (cacheName.equals(TRIGGERS_CACHE)) { + log.trace("Got a trigger for key {} : value {}", key, new_value); + final Node n = (Node) new_value; + // check if the node is local to this controller + ConnectionLocality locality = ConnectionLocality.NOT_LOCAL; + if(this.connectionManager != null) { + locality = this.connectionManager.getLocalityStatus(n); + } + if (locality == ConnectionLocality.LOCAL) { + log.trace("trigger for node {} processes locally", n); + // delete the trigger and proceed with handling the trigger + this.triggers.remove(key); + // this is a potentially long running task + // off load it from the listener thread + Runnable r = new Runnable() { + @Override + public void run() { + // the node is local. + // call the read service + if (reader != null) { + List flows = reader.nonCachedReadAllFlows(n); + if (flows != null) { + flowStatistics.put(n, flows); + } + } + } + }; + // submit the runnable for execution + if(this.triggerExecutor != null) { + this.triggerExecutor.execute(r); + } + } + } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) { + // flow statistics cache updated + // get the node + log.trace("Got a flow statistics cache update for key {}", key); + // this is a short running task + // no need of off loading from the listener thread + final Node n = (Node) key; + // check if an outstanding trigger exists for this node + CountDownLatch l = this.latches.get(n); + if(l != null) { + // someone was waiting for this update + // let him know + l.countDown(); + } + } + } + + @Override + public void entryDeleted(Object key, String cacheName, boolean originLocal) { + /* + * Do nothing + */ + } } diff --git a/opendaylight/statisticsmanager/integrationtest/pom.xml b/opendaylight/statisticsmanager/integrationtest/pom.xml index 915717da70..c8b458ea77 100644 --- a/opendaylight/statisticsmanager/integrationtest/pom.xml +++ b/opendaylight/statisticsmanager/integrationtest/pom.xml @@ -60,7 +60,7 @@ org.opendaylight.controller statisticsmanager - 0.4.1-SNAPSHOT + 0.5.0-SNAPSHOT org.opendaylight.controller @@ -97,6 +97,26 @@ topologymanager 0.4.1-SNAPSHOT + + org.opendaylight.controller + connectionmanager + 0.1.1-SNAPSHOT + + + org.opendaylight.controller + connectionmanager.implementation + 0.1.1-SNAPSHOT + + + org.opendaylight.controller + sal.connection + 0.1.1-SNAPSHOT + + + org.opendaylight.controller + sal.connection.implementation + 0.1.1-SNAPSHOT + junit junit diff --git a/opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java b/opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java index 457e99dcd4..292f4cf8d3 100644 --- a/opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java +++ b/opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java @@ -116,6 +116,14 @@ public class StatisticsManagerIT { .versionAsInProject(), mavenBundle("org.opendaylight.controller", "forwardingrulesmanager") .versionAsInProject(), + mavenBundle("org.opendaylight.controller", "connectionmanager.implementation"). + versionAsInProject(), + mavenBundle("org.opendaylight.controller", "connectionmanager"). + versionAsInProject(), + mavenBundle("org.opendaylight.controller", "sal.connection"). + versionAsInProject(), + mavenBundle("org.opendaylight.controller", "sal.connection.implementation"). + versionAsInProject(), // needed by hosttracker mavenBundle("org.opendaylight.controller", "topologymanager")