Split off tracking of node presence 90/5290/6
authorRobert Varga <rovarga@cisco.com>
Thu, 13 Feb 2014 02:03:13 +0000 (03:03 +0100)
committerRobert Varga <rovarga@cisco.com>
Fri, 14 Feb 2014 19:35:00 +0000 (20:35 +0100)
Introducing FlowCapableTracker. It's sole responsibility is to react
to FlowCapableNodes appearing and disappearing in the inventory and
spinning up statistics handlers based on that. This makes the
NodeStatisticsHandler lifecycle more predictable, thus we can activate
the 'ignore' part of the getStatisticsHandler(). Final part limits the
visibility of the various 'getFooStats' methods, so we can reason about
them better.

Change-Id: Icdbcca6038ae7fb0bff06d378fafa0f476f0fe18
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java

diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java
new file mode 100644 (file)
index 0000000..075348d
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collection;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
+
+/**
+ * There is a single instance of this class and that instance is responsible for
+ * monitoring the operational data store for nodes being created/deleted and
+ * notifying StatisticsProvider. These events then control the lifecycle of
+ * NodeStatisticsHandler for a particular switch.
+ */
+final class FlowCapableTracker implements DataChangeListener {
+    private static final Logger logger = LoggerFactory.getLogger(FlowCapableTracker.class);
+
+    private final InstanceIdentifier<FlowCapableNode> root;
+    private final StatisticsProvider stats;
+
+    private final Predicate<InstanceIdentifier<?>> filterIdentifiers = new Predicate<InstanceIdentifier<?>>() {
+        @Override
+        public boolean apply(final InstanceIdentifier<?> input) {
+            /*
+             * This notification has been triggered either by the ancestor,
+             * descendant or directly for the FlowCapableNode itself. We
+             * are not interested descendants, so let's prune them based
+             * on the depth of their identifier.
+             */
+            if (root.getPath().size() < input.getPath().size()) {
+                logger.debug("Ignoring notification for descendant {}", input);
+                return false;
+            }
+
+            logger.debug("Including notification for {}", input);
+            return true;
+        }
+    };
+
+    public FlowCapableTracker(final StatisticsProvider stats, InstanceIdentifier<FlowCapableNode> root) {
+        this.stats = Preconditions.checkNotNull(stats);
+        this.root = Preconditions.checkNotNull(root);
+    }
+
+    /*
+     * This method is synchronized because we want to make sure to serialize input
+     * from the datastore. Competing add/remove could be problematic otherwise.
+     */
+    @Override
+    public synchronized void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        /*
+         * First process all the identifiers which were removed, trying to figure out
+         * whether they constitute removal of FlowCapableNode.
+         */
+        final Collection<NodeKey> removedNodes =
+            Collections2.filter(Collections2.transform(
+                Sets.filter(change.getRemovedOperationalData(), filterIdentifiers),
+                new Function<InstanceIdentifier<?>, NodeKey>() {
+                    @Override
+                    public NodeKey apply(final InstanceIdentifier<?> input) {
+                        final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+                        if (key == null) {
+                            // FIXME: do we have a backup plan?
+                            logger.info("Failed to extract node key from {}", input);
+                        }
+                        return key;
+                    }
+                }), Predicates.notNull());
+        stats.stopNodeHandlers(removedNodes);
+
+        final Collection<NodeKey> addedNodes =
+            Collections2.filter(Collections2.transform(
+                Sets.filter(change.getCreatedOperationalData().keySet(), filterIdentifiers),
+                new Function<InstanceIdentifier<?>, NodeKey>() {
+                    @Override
+                    public NodeKey apply(final InstanceIdentifier<?> input) {
+                        final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+                        if (key == null) {
+                            // FIXME: do we have a backup plan?
+                            logger.info("Failed to extract node key from {}", input);
+                    }
+                    return key;
+                }
+            }), Predicates.notNull());
+        stats.startNodeHandlers(addedNodes);
+    }
+}
index 395bacb..e9cecc5 100644 (file)
@@ -103,7 +103,7 @@ import com.google.common.base.Preconditions;
  *
  * @author avishnoi@in.ibm.com
  */
-public class NodeStatisticsHandler {
+public class NodeStatisticsHandler implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
@@ -767,4 +767,10 @@ public class NodeStatisticsHandler {
         InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
         trans.removeOperationalData(nodeGroupStatisticsAugmentation);
     }
+
+    @Override
+    public void close() {
+        // FIXME: cleanup any resources we hold (registrations, etc.)
+        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
+    }
 }
index ab5d20a..4851b44 100644 (file)
@@ -8,17 +8,18 @@
 package org.opendaylight.controller.md.statistics.manager;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.eclipse.xtext.xbase.lib.Exceptions;
 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -64,6 +65,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -96,7 +98,7 @@ public class StatisticsProvider implements AutoCloseable {
     private final DataProviderService dps;
 
     //Local caching of stats
-    private final ConcurrentMap<NodeId,NodeStatisticsHandler> statisticsCache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<NodeId,NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
 
     private OpendaylightGroupStatisticsService groupStatsService;
 
@@ -129,14 +131,11 @@ public class StatisticsProvider implements AutoCloseable {
 
     private Registration<NotificationListener> listenerRegistration;
 
-    public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
-
-        this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+    private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
 
-        statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
-        registerDataStoreUpdateListener(dbs);
+    public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
 
-        // Get Group/Meter statistics service instance
+        // Get Group/Meter statistics service instances
         groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
         meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
         flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
@@ -144,6 +143,18 @@ public class StatisticsProvider implements AutoCloseable {
         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
 
+        // Start receiving notifications
+        this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+
+        // Register for switch connect/disconnect notifications
+        final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
+                .child(Node.class).augmentation(FlowCapableNode.class).build();
+        this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
+                new FlowCapableTracker(this, fcnId));
+
+        statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+        registerDataStoreUpdateListener(dbs);
+
         statisticsRequesterThread = new Thread( new Runnable(){
 
             @Override
@@ -170,7 +181,7 @@ public class StatisticsProvider implements AutoCloseable {
             public void run() {
                 while(true){
                     try {
-                        for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){
+                        for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
                             nodeStatisticsAger.cleanStaleStatistics();
                         }
                         multipartMessageManager.cleanStaleTransactionIds();
@@ -191,10 +202,7 @@ public class StatisticsProvider implements AutoCloseable {
     }
 
     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
-        //Register for Node updates
-        InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
-                                                                        .child(Node.class).toInstance();
-        dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+        // FIXME: the below should be broken out into StatisticsUpdateHandler
 
         //Register for flow updates
         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
@@ -239,22 +247,22 @@ public class StatisticsProvider implements AutoCloseable {
         for (Node targetNode : targetNodes){
 
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
-                sendStatisticsRequestsToNode(targetNode);
+                sendStatisticsRequestsToNode(targetNode.getKey());
             }
         }
     }
 
-    public void sendStatisticsRequestsToNode(Node targetNode){
+    private void sendStatisticsRequestsToNode(NodeKey targetNode){
 
         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
 
-        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance();
 
         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
 
         try{
             if(flowStatsService != null){
-                sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+                sendAggregateFlowsStatsFromAllTablesRequest(targetNode);
                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
             }
             if(flowTableStatsService != null){
@@ -280,7 +288,7 @@ public class StatisticsProvider implements AutoCloseable {
     }
 
 
-    public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+    private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
         final GetFlowTablesStatisticsInputBuilder input =
                 new GetFlowTablesStatisticsInputBuilder();
 
@@ -294,7 +302,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
 
@@ -323,7 +331,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+    private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
 
         List<Short> tablesId = getTablesFromNode(targetNodeKey);
 
@@ -337,7 +345,7 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
 
-    public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+    private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
 
         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
@@ -353,7 +361,7 @@ public class StatisticsProvider implements AutoCloseable {
                 , StatsRequestType.AGGR_FLOW);
     }
 
-    public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
 
         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
 
@@ -366,7 +374,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
 
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
 
@@ -393,7 +401,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
 
         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
 
@@ -421,7 +429,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+    private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
 
         input.setNode(targetNode);
@@ -457,13 +465,11 @@ public class StatisticsProvider implements AutoCloseable {
      */
     public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
         Preconditions.checkNotNull(nodeId);
-        NodeStatisticsHandler ager = statisticsCache.get(nodeId);
-        if (ager == null) {
-            ager = new NodeStatisticsHandler(this, new NodeKey(nodeId));
-            statisticsCache.put(nodeId, ager);
+        NodeStatisticsHandler handler = handlers.get(nodeId);
+        if (handler == null) {
+            spLogger.info("Attempted to get non-existing handler for {}", nodeId);
         }
-
-        return ager;
+        return handler;
     }
 
     private List<Node> getAllConnectedNodes(){
@@ -496,24 +502,50 @@ public class StatisticsProvider implements AutoCloseable {
         return nodeKey.getId();
     }
 
-    @SuppressWarnings("deprecation")
     @Override
-    public void close(){
-
+    public void close() {
         try {
-            spLogger.info("Statistics Provider stopped.");
             if (this.listenerRegistration != null) {
-
                 this.listenerRegistration.close();
-
                 this.statisticsRequesterThread.destroy();
-
                 this.statisticsAgerThread.destroy();
+            }
+            if (this.flowCapableTrackerRegistration != null) {
+                this.flowCapableTrackerRegistration.close();
+                this.flowCapableTrackerRegistration = null;
+            }
+        } catch (Exception e) {
+            spLogger.warn("Failed to stop Statistics Provider completely", e);
+        } finally {
+            spLogger.info("Statistics Provider stopped.");
+        }
+    }
 
+    synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+        for (NodeKey key : addedNodes) {
+            if (handlers.containsKey(key.getId())) {
+                spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
+                continue;
             }
-          } catch (Throwable e) {
-            throw Exceptions.sneakyThrow(e);
-          }
+
+            final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
+            handlers.put(key.getId(), h);
+            spLogger.debug("Started node handler for {}", key.getId());
+
+            // FIXME: this should be in the NodeStatisticsHandler itself
+            sendStatisticsRequestsToNode(key);
+        }
     }
 
+    synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+        for (NodeKey key : removedNodes) {
+            final NodeStatisticsHandler s = handlers.remove(key.getId());
+            if (s != null) {
+                spLogger.debug("Stopping node handler for {}", key.getId());
+                s.close();
+            } else {
+                spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
+            }
+        }
+    }
 }
index acf182a..d188a74 100644 (file)
@@ -14,7 +14,6 @@ import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
@@ -57,19 +56,6 @@ public class StatisticsUpdateHandler implements DataChangeListener {
     @SuppressWarnings("unchecked")
     @Override
     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-
-        Map<InstanceIdentifier<?>, DataObject> nodeAdditions = change.getCreatedOperationalData();
-        for (InstanceIdentifier<? extends DataObject> dataObjectInstance : nodeAdditions.keySet()) {
-            DataObject dataObject = nodeAdditions.get(dataObjectInstance);
-            if(dataObject instanceof Node){
-
-                Node node = (Node) dataObject;
-                if(node.getAugmentation(FlowCapableNode.class) != null){
-                    this.statisticsManager.sendStatisticsRequestsToNode(node);
-                }
-            }
-        }
-
         Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
         for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
             DataObject dataObject = additions.get(dataObjectInstance);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.