--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collection;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
+
+/**
+ * There is a single instance of this class and that instance is responsible for
+ * monitoring the operational data store for nodes being created/deleted and
+ * notifying StatisticsProvider. These events then control the lifecycle of
+ * NodeStatisticsHandler for a particular switch.
+ */
+final class FlowCapableTracker implements DataChangeListener {
+ private static final Logger logger = LoggerFactory.getLogger(FlowCapableTracker.class);
+
+ private final InstanceIdentifier<FlowCapableNode> root;
+ private final StatisticsProvider stats;
+
+ private final Predicate<InstanceIdentifier<?>> filterIdentifiers = new Predicate<InstanceIdentifier<?>>() {
+ @Override
+ public boolean apply(final InstanceIdentifier<?> input) {
+ /*
+ * This notification has been triggered either by the ancestor,
+ * descendant or directly for the FlowCapableNode itself. We
+ * are not interested descendants, so let's prune them based
+ * on the depth of their identifier.
+ */
+ if (root.getPath().size() < input.getPath().size()) {
+ logger.debug("Ignoring notification for descendant {}", input);
+ return false;
+ }
+
+ logger.debug("Including notification for {}", input);
+ return true;
+ }
+ };
+
+ public FlowCapableTracker(final StatisticsProvider stats, InstanceIdentifier<FlowCapableNode> root) {
+ this.stats = Preconditions.checkNotNull(stats);
+ this.root = Preconditions.checkNotNull(root);
+ }
+
+ /*
+ * This method is synchronized because we want to make sure to serialize input
+ * from the datastore. Competing add/remove could be problematic otherwise.
+ */
+ @Override
+ public synchronized void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ /*
+ * First process all the identifiers which were removed, trying to figure out
+ * whether they constitute removal of FlowCapableNode.
+ */
+ final Collection<NodeKey> removedNodes =
+ Collections2.filter(Collections2.transform(
+ Sets.filter(change.getRemovedOperationalData(), filterIdentifiers),
+ new Function<InstanceIdentifier<?>, NodeKey>() {
+ @Override
+ public NodeKey apply(final InstanceIdentifier<?> input) {
+ final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+ if (key == null) {
+ // FIXME: do we have a backup plan?
+ logger.info("Failed to extract node key from {}", input);
+ }
+ return key;
+ }
+ }), Predicates.notNull());
+ stats.stopNodeHandlers(removedNodes);
+
+ final Collection<NodeKey> addedNodes =
+ Collections2.filter(Collections2.transform(
+ Sets.filter(change.getCreatedOperationalData().keySet(), filterIdentifiers),
+ new Function<InstanceIdentifier<?>, NodeKey>() {
+ @Override
+ public NodeKey apply(final InstanceIdentifier<?> input) {
+ final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+ if (key == null) {
+ // FIXME: do we have a backup plan?
+ logger.info("Failed to extract node key from {}", input);
+ }
+ return key;
+ }
+ }), Predicates.notNull());
+ stats.startNodeHandlers(addedNodes);
+ }
+}
package org.opendaylight.controller.md.statistics.manager;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.eclipse.xtext.xbase.lib.Exceptions;
import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private final DataProviderService dps;
//Local caching of stats
- private final ConcurrentMap<NodeId,NodeStatisticsHandler> statisticsCache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<NodeId,NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
private OpendaylightGroupStatisticsService groupStatsService;
private Registration<NotificationListener> listenerRegistration;
- public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
-
- this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+ private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
- statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
- registerDataStoreUpdateListener(dbs);
+ public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
- // Get Group/Meter statistics service instance
+ // Get Group/Meter statistics service instances
groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
+ // Start receiving notifications
+ this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+
+ // Register for switch connect/disconnect notifications
+ final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class).augmentation(FlowCapableNode.class).build();
+ this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
+ new FlowCapableTracker(this, fcnId));
+
+ statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+ registerDataStoreUpdateListener(dbs);
+
statisticsRequesterThread = new Thread( new Runnable(){
@Override
public void run() {
while(true){
try {
- for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){
+ for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
nodeStatisticsAger.cleanStaleStatistics();
}
multipartMessageManager.cleanStaleTransactionIds();
}
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
- //Register for Node updates
- InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class).toInstance();
- dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+ // FIXME: the below should be broken out into StatisticsUpdateHandler
//Register for flow updates
InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
for (Node targetNode : targetNodes){
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
- sendStatisticsRequestsToNode(targetNode);
+ sendStatisticsRequestsToNode(targetNode.getKey());
}
}
}
- public void sendStatisticsRequestsToNode(Node targetNode){
+ private void sendStatisticsRequestsToNode(NodeKey targetNode){
spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
- InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+ InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance();
NodeRef targetNodeRef = new NodeRef(targetInstanceId);
try{
if(flowStatsService != null){
- sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+ sendAggregateFlowsStatsFromAllTablesRequest(targetNode);
sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
}
if(flowTableStatsService != null){
}
- public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+ private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
}
- public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
}
- public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+ private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
List<Short> tablesId = getTablesFromNode(targetNodeKey);
}
}
- public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+ private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
, StatsRequestType.AGGR_FLOW);
}
- public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
}
- public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
}
- public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
}
- public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+ private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
input.setNode(targetNode);
*/
public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
Preconditions.checkNotNull(nodeId);
- NodeStatisticsHandler ager = statisticsCache.get(nodeId);
- if (ager == null) {
- ager = new NodeStatisticsHandler(this, new NodeKey(nodeId));
- statisticsCache.put(nodeId, ager);
+ NodeStatisticsHandler handler = handlers.get(nodeId);
+ if (handler == null) {
+ spLogger.info("Attempted to get non-existing handler for {}", nodeId);
}
-
- return ager;
+ return handler;
}
private List<Node> getAllConnectedNodes(){
return nodeKey.getId();
}
- @SuppressWarnings("deprecation")
@Override
- public void close(){
-
+ public void close() {
try {
- spLogger.info("Statistics Provider stopped.");
if (this.listenerRegistration != null) {
-
this.listenerRegistration.close();
-
this.statisticsRequesterThread.destroy();
-
this.statisticsAgerThread.destroy();
+ }
+ if (this.flowCapableTrackerRegistration != null) {
+ this.flowCapableTrackerRegistration.close();
+ this.flowCapableTrackerRegistration = null;
+ }
+ } catch (Exception e) {
+ spLogger.warn("Failed to stop Statistics Provider completely", e);
+ } finally {
+ spLogger.info("Statistics Provider stopped.");
+ }
+ }
+ synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+ for (NodeKey key : addedNodes) {
+ if (handlers.containsKey(key.getId())) {
+ spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
+ continue;
}
- } catch (Throwable e) {
- throw Exceptions.sneakyThrow(e);
- }
+
+ final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
+ handlers.put(key.getId(), h);
+ spLogger.debug("Started node handler for {}", key.getId());
+
+ // FIXME: this should be in the NodeStatisticsHandler itself
+ sendStatisticsRequestsToNode(key);
+ }
}
+ synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+ for (NodeKey key : removedNodes) {
+ final NodeStatisticsHandler s = handlers.remove(key.getId());
+ if (s != null) {
+ spLogger.debug("Stopping node handler for {}", key.getId());
+ s.close();
+ } else {
+ spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
+ }
+ }
+ }
}