Added method for getting non-cached flow statistics 20/2120/1
authorAsad Ahmed <asaahmed@cisco.com>
Wed, 23 Oct 2013 23:57:41 +0000 (16:57 -0700)
committerAsad Ahmed <asaahmed@cisco.com>
Wed, 23 Oct 2013 23:57:41 +0000 (16:57 -0700)
Change-Id: I55f5846e35caca50dd638113fe77882b7213d221
Signed-off-by: Asad Ahmed <asaahmed@cisco.com>
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/northbound/integrationtest/pom.xml
opendaylight/statisticsmanager/api/pom.xml
opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java
opendaylight/statisticsmanager/implementation/pom.xml
opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java
opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java
opendaylight/statisticsmanager/integrationtest/pom.xml
opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java

index 11373d968391ee58d112731f6a6c4e4acf09306b..525d685bee40056dd288d70439b4ffded732ea83 100644 (file)
@@ -95,6 +95,7 @@
     <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
     <sonar.language>java</sonar.language>
     <forwardingrulesmanager.version>0.5.0-SNAPSHOT</forwardingrulesmanager.version>
+    <statisticsmanager.version>0.5.0-SNAPSHOT</statisticsmanager.version>
   </properties>
 
   <dependencyManagement>
index e7a612d41b887b01bdb503650e277a916770e055..5927d26bd714bc4bc08c0a80cdc86f28616e057a 100644 (file)
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>statisticsmanager</artifactId>
-      <version>${controller.version}</version>
+      <version>${statisticsmanager.version}</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
index 04c152ad59ea164ad9bd7ef8eadbe2d820fc3d96..7bb06c461d57868496d0498194f48d9535699752 100644 (file)
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>statisticsmanager</artifactId>
-      <version>0.4.1-SNAPSHOT</version>
+      <version>0.5.0-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
index 9b775eaa9286f029aa9686ab4c65ac2004e1b7bf..0dca7396e71ead84841bee31b4c9f2db8708aaab 100644 (file)
@@ -15,7 +15,7 @@
   </scm>
 
   <artifactId>statisticsmanager</artifactId>
-  <version>0.4.1-SNAPSHOT</version>
+  <version>0.5.0-SNAPSHOT</version>
   <packaging>bundle</packaging>
   <build>
     <plugins>
index 8267e4d0299334a534a9844d759392a1e763d15f..c717ccf20818b02e39d638cda0a3634e8affae4e 100644 (file)
@@ -39,6 +39,17 @@ public interface IStatisticsManager {
      */
     List<FlowOnNode> getFlows(Node node);
 
+    /**
+     * Same as the getFlows method.
+     * The only difference is that this method does not return cached flows.
+     * It will always make a request to the node to get all the flows for that node.
+     * If the request times out or gets an error, we revert to getting the cached flows.
+     * @see IStatisticsManager#getFlows
+     * @param node
+     * @return List of flows installed on the network node.
+     */
+    List<FlowOnNode> getFlowsNoCache(Node node);
+
     /**
      * Returns the statistics for the flows specified in the list
      *
index 5d82c5c5dbd19075f3e63729f725ea9bf5e071cd..7281b9a71ffd7b08e60154e796b61ed5342b907e 100644 (file)
               org.slf4j,
               org.opendaylight.controller.sal.inventory,
               org.opendaylight.controller.sal.match,
+              org.opendaylight.controller.sal.connection,
               org.opendaylight.controller.switchmanager,
               org.opendaylight.controller.statisticsmanager,
               org.opendaylight.controller.forwardingrulesmanager,
+              org.opendaylight.controller.connectionmanager,
               org.apache.felix.dm
             </Import-Package>
             <Bundle-Activator>
@@ -93,7 +95,7 @@
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>statisticsmanager</artifactId>
-      <version>0.4.1-SNAPSHOT</version>
+      <version>0.5.0-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>clustering.services</artifactId>
       <version>0.4.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>connectionmanager</artifactId>
+      <version>0.1.1-SNAPSHOT</version>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
index 5afaaa69167f76081c35468693623af8a2f654f5..7eb32b05f7a95fa58cb636fa5288391b63cc4747 100644 (file)
@@ -9,8 +9,15 @@
 
 package org.opendaylight.controller.statisticsmanager.internal;
 
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+
 import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
@@ -56,10 +63,20 @@ public class Activator extends ComponentActivatorAbstractBase {
     public void configureInstance(Component c, Object imp, String containerName) {
         if (imp.equals(StatisticsManager.class)) {
             // export the service
-            c.setInterface(new String[] {
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            Set<String> propSet = new HashSet<String>();
+            // trigger cache
+            propSet.add(StatisticsManager.TRIGGERS_CACHE);
+            // flow statistics cache
+            propSet.add(StatisticsManager.FLOW_STATISTICS_CACHE);
+            props.put("cachenames", propSet);
+
+            String interfaces[] = new String[] {
                     IStatisticsManager.class.getName(),
                     IReadServiceListener.class.getName(),
-                    IListenInventoryUpdates.class.getName() }, null);
+                    IListenInventoryUpdates.class.getName(),
+                    ICacheUpdateAware.class.getName() };
+            c.setInterface(interfaces, props);
 
             c.add(createContainerServiceDependency(containerName).setService(IReadService.class)
                     .setCallbacks("setReaderService", "unsetReaderService").setRequired(true));
@@ -67,6 +84,8 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setClusterContainerService", "unsetClusterContainerService").setRequired(true));
             c.add(createContainerServiceDependency(containerName).setService(IContainer.class)
                     .setCallbacks("setIContainer", "unsetIContainer").setRequired(true));
+            c.add(createServiceDependency().setService(IConnectionManager.class)
+                    .setCallbacks("setIConnectionManager", "unsetIConnectionManager").setRequired(false));
 
         }
     }
index 02b6251e9f7076073b576b398df18a641d690582..92ed44efbbde51e80046048f3d4afe9e1a4de56f 100644 (file)
@@ -10,6 +10,7 @@
 package org.opendaylight.controller.statisticsmanager.internal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -19,12 +20,20 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
@@ -49,22 +58,47 @@ import org.slf4j.LoggerFactory;
  * The class caches latest network nodes statistics as notified by reader
  * services and provides API to retrieve them.
  */
-public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates {
+public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
+    ICacheUpdateAware<Object,Object> {
     private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
     private IContainer container;
     private IClusterContainerServices clusterContainerService;
     private IReadService reader;
+    private IConnectionManager connectionManager;
     //statistics caches
     private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
     private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
     private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
     private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
 
+    // data structure for latches
+    // this is not a cluster cache
+    private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
+    // 30 seconds is the timeout.
+    // the value of this can be tweaked based on performance tests.
+    private static long latchTimeout = 30;
+
+    // cache for flow stats refresh triggers
+    // an entry added to this map triggers the statistics manager
+    // to which the node is connected to get the latest flow stats from that node
+    // this is a cluster cache
+    private ConcurrentMap<Integer, Node> triggers;
+
+    // use an atomic integer for the triggers key
+    private AtomicInteger triggerKey = new AtomicInteger();
+
+    // single thread executor for the triggers
+    private ExecutorService triggerExecutor;
+
+    static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
+    static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
+
     private void nonClusterObjectCreate() {
         flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
         nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
         tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
         descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
+        triggers = new ConcurrentHashMap<Integer, Node>();
     }
 
     @SuppressWarnings("deprecation")
@@ -76,7 +110,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
         }
 
         try {
-            clusterContainerService.createCache("statisticsmanager.flowStatistics",
+            clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
             clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
@@ -84,7 +118,8 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
             clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
+            clusterContainerService.createCache(TRIGGERS_CACHE,
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
         } catch (CacheConfigException cce) {
             log.error("Statistics cache configuration invalid - check cache mode");
         } catch (CacheExistException ce) {
@@ -102,7 +137,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
 
         log.debug("Statistics Manager - retrieveCaches for Container {}", container);
 
-        map = clusterContainerService.getCache("statisticsmanager.flowStatistics");
+        map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
         if (map != null) {
             this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
         } else {
@@ -129,6 +164,13 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
         } else {
             log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
         }
+
+        map = clusterContainerService.getCache(TRIGGERS_CACHE);
+        if (map != null) {
+            this.triggers = (ConcurrentMap<Integer, Node>) map;
+        } else {
+            log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
+        }
     }
 
     /**
@@ -161,6 +203,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
      */
     void start() {
         log.debug("START called!");
+        this.triggerExecutor = Executors.newSingleThreadExecutor();
     }
 
     /**
@@ -204,6 +247,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
      */
     void stop() {
         log.debug("STOP called!");
+        this.triggerExecutor.shutdownNow();
     }
 
     void setClusterContainerService(IClusterContainerServices s) {
@@ -239,7 +283,7 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
     @Override
     public List<FlowOnNode> getFlows(Node node) {
         if (node == null) {
-            return null;
+            return Collections.emptyList();
         }
 
         List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
@@ -250,6 +294,62 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
         return flowList;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<FlowOnNode> getFlowsNoCache(Node node) {
+        if (node == null) {
+            return Collections.emptyList();
+        }
+        // check if the node is local to this controller
+        ConnectionLocality locality = ConnectionLocality.LOCAL;
+        if(this.connectionManager != null) {
+            locality = this.connectionManager.getLocalityStatus(node);
+        }
+        if (locality == ConnectionLocality.NOT_LOCAL) {
+            // send a trigger to all and wait for either a response or timeout
+            CountDownLatch newLatch = new CountDownLatch(1);
+            CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
+            this.triggers.put(this.triggerKey.incrementAndGet(), node);
+            try {
+                boolean retStatus;
+                if(oldLatch != null) {
+                    retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS);
+                } else {
+                    retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS);
+                }
+                // log the return code as it will give us, if
+                // the latch timed out.
+                log.debug("latch timed out {}", !retStatus);
+            } catch (InterruptedException e) {
+                // log the error and move on
+                log.warn("Waiting for statistics response interrupted", e);
+                // restore the interrupt status
+                // its a good practice to restore the interrupt status
+                // if you are not propagating the InterruptedException
+                Thread.currentThread().interrupt();
+            }
+            // now that the wait is over
+            // remove the latch entry
+            this.latches.remove(node);
+        } else {
+            // the node is local.
+            // call the read service
+            if (this.reader != null) {
+                List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
+                if (flows != null) {
+                    nodeFlowStatisticsUpdated(node, flows);
+                }
+            }
+        }
+        // at this point we are ready to return the cached value.
+        // this cached value will be up to date with a very high probability
+        // due to what we have done previously ie:- send a trigger for cache update
+        // or refreshed the cache if the node is local.
+        return getFlows(node);
+    }
+
     @Override
     public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
         Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
@@ -414,4 +514,85 @@ public class StatisticsManager implements IStatisticsManager, IReadServiceListen
     public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
         // Not interested in this update
     }
+
+    public void unsetIConnectionManager(IConnectionManager s) {
+        if (s == this.connectionManager) {
+            this.connectionManager = null;
+        }
+    }
+
+    public void setIConnectionManager(IConnectionManager s) {
+        this.connectionManager = s;
+    }
+
+    @Override
+    public void entryCreated(Object key, String cacheName, boolean originLocal) {
+        /*
+         * Do nothing
+         */
+    }
+
+    @Override
+    public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+        if (originLocal) {
+            /*
+             * Local updates are of no interest
+             */
+            return;
+        }
+        if (cacheName.equals(TRIGGERS_CACHE)) {
+            log.trace("Got a trigger for key {} : value {}", key, new_value);
+            final Node n = (Node) new_value;
+            // check if the node is local to this controller
+            ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
+            if(this.connectionManager != null) {
+                locality = this.connectionManager.getLocalityStatus(n);
+            }
+            if (locality == ConnectionLocality.LOCAL) {
+                log.trace("trigger for node {} processes locally", n);
+                // delete the trigger and proceed with handling the trigger
+                this.triggers.remove(key);
+                // this is a potentially long running task
+                // off load it from the listener thread
+                Runnable r = new Runnable() {
+                    @Override
+                    public void run() {
+                        // the node is local.
+                        // call the read service
+                        if (reader != null) {
+                            List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
+                            if (flows != null) {
+                                flowStatistics.put(n, flows);
+                            }
+                        }
+                    }
+                };
+                // submit the runnable for execution
+                if(this.triggerExecutor != null) {
+                    this.triggerExecutor.execute(r);
+                }
+            }
+        } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
+            // flow statistics cache updated
+            // get the node
+            log.trace("Got a flow statistics cache update for key {}", key);
+            // this is a short running task
+            // no need of off loading from the listener thread
+            final Node n = (Node) key;
+            // check if an outstanding trigger exists for this node
+            CountDownLatch l = this.latches.get(n);
+            if(l != null) {
+                // someone was waiting for this update
+                // let him know
+                l.countDown();
+            }
+        }
+    }
+
+    @Override
+    public void entryDeleted(Object key, String cacheName, boolean originLocal) {
+        /*
+         * Do nothing
+         */
+    }
 }
index 915717da70fc784e8b5524f1bfeda6919ca2cb58..c8b458ea777c3b0e3e4d377e18b459627a6facb1 100644 (file)
@@ -60,7 +60,7 @@
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>statisticsmanager</artifactId>
-      <version>0.4.1-SNAPSHOT</version>
+      <version>0.5.0-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>topologymanager</artifactId>
       <version>0.4.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>connectionmanager</artifactId>
+      <version>0.1.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>connectionmanager.implementation</artifactId>
+      <version>0.1.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal.connection</artifactId>
+      <version>0.1.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal.connection.implementation</artifactId>
+      <version>0.1.1-SNAPSHOT</version>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
index 457e99dcd415b951747bff05077a9d18853212dd..292f4cf8d373c822b9ecb0fe10c93ef2b64db09d 100644 (file)
@@ -116,6 +116,14 @@ public class StatisticsManagerIT {
                     .versionAsInProject(),
                 mavenBundle("org.opendaylight.controller", "forwardingrulesmanager")
                     .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "connectionmanager.implementation").
+                    versionAsInProject(),
+                mavenBundle("org.opendaylight.controller",  "connectionmanager").
+                    versionAsInProject(),
+                mavenBundle("org.opendaylight.controller",  "sal.connection").
+                    versionAsInProject(),
+                mavenBundle("org.opendaylight.controller",  "sal.connection.implementation").
+                    versionAsInProject(),
 
                 // needed by hosttracker
                 mavenBundle("org.opendaylight.controller", "topologymanager")