From: Robert Varga Date: Thu, 13 Feb 2014 02:03:13 +0000 (+0100) Subject: Split off tracking of node presence X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~450^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=38ba1c1a67f4ee58c2e2ea64a51e2624c1c116ee Split off tracking of node presence Introducing FlowCapableTracker. It's sole responsibility is to react to FlowCapableNodes appearing and disappearing in the inventory and spinning up statistics handlers based on that. This makes the NodeStatisticsHandler lifecycle more predictable, thus we can activate the 'ignore' part of the getStatisticsHandler(). Final part limits the visibility of the various 'getFooStats' methods, so we can reason about them better. Change-Id: Icdbcca6038ae7fb0bff06d378fafa0f476f0fe18 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java new file mode 100644 index 0000000000..075348d54c --- /dev/null +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java @@ -0,0 +1,107 @@ +/* + * Copyright IBM Corporation, 2013. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.statistics.manager; + +import java.util.Collection; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; +import com.google.common.collect.Sets; + +/** + * There is a single instance of this class and that instance is responsible for + * monitoring the operational data store for nodes being created/deleted and + * notifying StatisticsProvider. These events then control the lifecycle of + * NodeStatisticsHandler for a particular switch. + */ +final class FlowCapableTracker implements DataChangeListener { + private static final Logger logger = LoggerFactory.getLogger(FlowCapableTracker.class); + + private final InstanceIdentifier root; + private final StatisticsProvider stats; + + private final Predicate> filterIdentifiers = new Predicate>() { + @Override + public boolean apply(final InstanceIdentifier input) { + /* + * This notification has been triggered either by the ancestor, + * descendant or directly for the FlowCapableNode itself. We + * are not interested descendants, so let's prune them based + * on the depth of their identifier. + */ + if (root.getPath().size() < input.getPath().size()) { + logger.debug("Ignoring notification for descendant {}", input); + return false; + } + + logger.debug("Including notification for {}", input); + return true; + } + }; + + public FlowCapableTracker(final StatisticsProvider stats, InstanceIdentifier root) { + this.stats = Preconditions.checkNotNull(stats); + this.root = Preconditions.checkNotNull(root); + } + + /* + * This method is synchronized because we want to make sure to serialize input + * from the datastore. Competing add/remove could be problematic otherwise. + */ + @Override + public synchronized void onDataChanged(final DataChangeEvent, DataObject> change) { + /* + * First process all the identifiers which were removed, trying to figure out + * whether they constitute removal of FlowCapableNode. + */ + final Collection removedNodes = + Collections2.filter(Collections2.transform( + Sets.filter(change.getRemovedOperationalData(), filterIdentifiers), + new Function, NodeKey>() { + @Override + public NodeKey apply(final InstanceIdentifier input) { + final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class); + if (key == null) { + // FIXME: do we have a backup plan? + logger.info("Failed to extract node key from {}", input); + } + return key; + } + }), Predicates.notNull()); + stats.stopNodeHandlers(removedNodes); + + final Collection addedNodes = + Collections2.filter(Collections2.transform( + Sets.filter(change.getCreatedOperationalData().keySet(), filterIdentifiers), + new Function, NodeKey>() { + @Override + public NodeKey apply(final InstanceIdentifier input) { + final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class); + if (key == null) { + // FIXME: do we have a backup plan? + logger.info("Failed to extract node key from {}", input); + } + return key; + } + }), Predicates.notNull()); + stats.startNodeHandlers(addedNodes); + } +} diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java index 395bacb5e3..e9cecc521e 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java @@ -103,7 +103,7 @@ import com.google.common.base.Preconditions; * * @author avishnoi@in.ibm.com */ -public class NodeStatisticsHandler { +public class NodeStatisticsHandler implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class); private static final int NUMBER_OF_WAIT_CYCLES = 2; @@ -767,4 +767,10 @@ public class NodeStatisticsHandler { InstanceIdentifier nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance(); trans.removeOperationalData(nodeGroupStatisticsAugmentation); } + + @Override + public void close() { + // FIXME: cleanup any resources we hold (registrations, etc.) + logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); + } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java index ab5d20a951..4851b441c3 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java @@ -8,17 +8,18 @@ package org.opendaylight.controller.md.statistics.manager; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.eclipse.xtext.xbase.lib.Exceptions; import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; +import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -64,6 +65,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216. import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; @@ -96,7 +98,7 @@ public class StatisticsProvider implements AutoCloseable { private final DataProviderService dps; //Local caching of stats - private final ConcurrentMap statisticsCache = new ConcurrentHashMap<>(); + private final ConcurrentMap handlers = new ConcurrentHashMap<>(); private OpendaylightGroupStatisticsService groupStatsService; @@ -129,14 +131,11 @@ public class StatisticsProvider implements AutoCloseable { private Registration listenerRegistration; - public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) { - - this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter); + private ListenerRegistration flowCapableTrackerRegistration; - statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this); - registerDataStoreUpdateListener(dbs); + public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) { - // Get Group/Meter statistics service instance + // Get Group/Meter statistics service instances groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class); meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class); flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class); @@ -144,6 +143,18 @@ public class StatisticsProvider implements AutoCloseable { flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class); queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class); + // Start receiving notifications + this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter); + + // Register for switch connect/disconnect notifications + final InstanceIdentifier fcnId = InstanceIdentifier.builder(Nodes.class) + .child(Node.class).augmentation(FlowCapableNode.class).build(); + this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId, + new FlowCapableTracker(this, fcnId)); + + statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this); + registerDataStoreUpdateListener(dbs); + statisticsRequesterThread = new Thread( new Runnable(){ @Override @@ -170,7 +181,7 @@ public class StatisticsProvider implements AutoCloseable { public void run() { while(true){ try { - for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){ + for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){ nodeStatisticsAger.cleanStaleStatistics(); } multipartMessageManager.cleanStaleTransactionIds(); @@ -191,10 +202,7 @@ public class StatisticsProvider implements AutoCloseable { } private void registerDataStoreUpdateListener(DataBrokerService dbs) { - //Register for Node updates - InstanceIdentifier pathNode = InstanceIdentifier.builder(Nodes.class) - .child(Node.class).toInstance(); - dbs.registerDataChangeListener(pathNode, statsUpdateHandler); + // FIXME: the below should be broken out into StatisticsUpdateHandler //Register for flow updates InstanceIdentifier pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class) @@ -239,22 +247,22 @@ public class StatisticsProvider implements AutoCloseable { for (Node targetNode : targetNodes){ if(targetNode.getAugmentation(FlowCapableNode.class) != null){ - sendStatisticsRequestsToNode(targetNode); + sendStatisticsRequestsToNode(targetNode.getKey()); } } } - public void sendStatisticsRequestsToNode(Node targetNode){ + private void sendStatisticsRequestsToNode(NodeKey targetNode){ spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId()); - InstanceIdentifier targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance(); + InstanceIdentifier targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance(); NodeRef targetNodeRef = new NodeRef(targetInstanceId); try{ if(flowStatsService != null){ - sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey()); + sendAggregateFlowsStatsFromAllTablesRequest(targetNode); sendAllFlowsStatsFromAllTablesRequest(targetNodeRef); } if(flowTableStatsService != null){ @@ -280,7 +288,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException { + private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException { final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder(); @@ -294,7 +302,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); @@ -323,7 +331,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{ + private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{ List tablesId = getTablesFromNode(targetNodeKey); @@ -337,7 +345,7 @@ public class StatisticsProvider implements AutoCloseable { } } - public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{ + private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{ spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey); GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = @@ -353,7 +361,7 @@ public class StatisticsProvider implements AutoCloseable { , StatsRequestType.AGGR_FLOW); } - public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder(); @@ -366,7 +374,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder(); @@ -393,7 +401,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder(); @@ -421,7 +429,7 @@ public class StatisticsProvider implements AutoCloseable { } - public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException { + private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException { GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder(); input.setNode(targetNode); @@ -457,13 +465,11 @@ public class StatisticsProvider implements AutoCloseable { */ public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) { Preconditions.checkNotNull(nodeId); - NodeStatisticsHandler ager = statisticsCache.get(nodeId); - if (ager == null) { - ager = new NodeStatisticsHandler(this, new NodeKey(nodeId)); - statisticsCache.put(nodeId, ager); + NodeStatisticsHandler handler = handlers.get(nodeId); + if (handler == null) { + spLogger.info("Attempted to get non-existing handler for {}", nodeId); } - - return ager; + return handler; } private List getAllConnectedNodes(){ @@ -496,24 +502,50 @@ public class StatisticsProvider implements AutoCloseable { return nodeKey.getId(); } - @SuppressWarnings("deprecation") @Override - public void close(){ - + public void close() { try { - spLogger.info("Statistics Provider stopped."); if (this.listenerRegistration != null) { - this.listenerRegistration.close(); - this.statisticsRequesterThread.destroy(); - this.statisticsAgerThread.destroy(); + } + if (this.flowCapableTrackerRegistration != null) { + this.flowCapableTrackerRegistration.close(); + this.flowCapableTrackerRegistration = null; + } + } catch (Exception e) { + spLogger.warn("Failed to stop Statistics Provider completely", e); + } finally { + spLogger.info("Statistics Provider stopped."); + } + } + synchronized void startNodeHandlers(final Collection addedNodes) { + for (NodeKey key : addedNodes) { + if (handlers.containsKey(key.getId())) { + spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId()); + continue; } - } catch (Throwable e) { - throw Exceptions.sneakyThrow(e); - } + + final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key); + handlers.put(key.getId(), h); + spLogger.debug("Started node handler for {}", key.getId()); + + // FIXME: this should be in the NodeStatisticsHandler itself + sendStatisticsRequestsToNode(key); + } } + synchronized void stopNodeHandlers(final Collection removedNodes) { + for (NodeKey key : removedNodes) { + final NodeStatisticsHandler s = handlers.remove(key.getId()); + if (s != null) { + spLogger.debug("Stopping node handler for {}", key.getId()); + s.close(); + } else { + spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId()); + } + } + } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java index acf182ad2b..d188a74a00 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java @@ -14,7 +14,6 @@ import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; @@ -57,19 +56,6 @@ public class StatisticsUpdateHandler implements DataChangeListener { @SuppressWarnings("unchecked") @Override public void onDataChanged(DataChangeEvent, DataObject> change) { - - Map, DataObject> nodeAdditions = change.getCreatedOperationalData(); - for (InstanceIdentifier dataObjectInstance : nodeAdditions.keySet()) { - DataObject dataObject = nodeAdditions.get(dataObjectInstance); - if(dataObject instanceof Node){ - - Node node = (Node) dataObject; - if(node.getAugmentation(FlowCapableNode.class) != null){ - this.statisticsManager.sendStatisticsRequestsToNode(node); - } - } - } - Map, DataObject> additions = change.getCreatedConfigurationData(); for (InstanceIdentifier dataObjectInstance : additions.keySet()) { DataObject dataObject = additions.get(dataObjectInstance);