Move queue/meter/flow listeners into their trackers
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
index 17f1ce2a7be482459d1fa78f6423f0bb952b4e92..691b9c0b15f58a9d6a01d05e7146c8a69251e332 100644 (file)
@@ -11,6 +11,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -19,11 +20,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
@@ -35,18 +41,24 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * This class handles the lifecycle of per-node statistics. It receives data
@@ -55,10 +67,11 @@ import com.google.common.base.Preconditions;
  *
  * @author avishnoi@in.ibm.com
  */
-public final class NodeStatisticsHandler implements AutoCloseable {
+public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
+    private final MultipartMessageManager msgManager = new MultipartMessageManager();
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -72,71 +85,95 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     private final NodeRef targetNodeRef;
     private final NodeKey targetNodeKey;
 
-    public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey) {
+    public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
+            final OpendaylightFlowStatisticsService flowStatsService,
+            final OpendaylightFlowTableStatisticsService flowTableStatsService,
+            final OpendaylightGroupStatisticsService groupStatsService,
+            final OpendaylightMeterStatisticsService meterStatsService,
+            final OpendaylightPortStatisticsService portStatsService,
+            final OpendaylightQueueStatisticsService queueStatsService) {
         this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
 
         final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
-        flowStats = new FlowStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        flowTableStats = new FlowTableStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        groupDescStats = new GroupDescStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        groupStats = new GroupStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        meterConfigStats = new MeterConfigStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        meterStats = new MeterStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        nodeConnectorStats = new NodeConnectorStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        queueStats = new QueueStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+
+        flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
+        groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
+        groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
+        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
+        meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
+        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
+        queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
     }
 
     public NodeKey getTargetNodeKey() {
         return targetNodeKey;
     }
 
-    public Collection<TableKey> getKnownTables() {
-        return flowTableStats.getTables();
-    }
-
-    public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+    @Override
+    public InstanceIdentifier<Node> getNodeIdentifier() {
         return targetNodeIdentifier;
     }
 
-    public NodeRef getTargetNodeRef() {
+    @Override
+    public NodeRef getNodeRef() {
         return targetNodeRef;
     }
 
-    public synchronized void updateGroupDescStats(List<GroupDescStats> list) {
-        groupDescStats.updateStats(list);
+    @Override
+    public DataModificationTransaction startDataModification() {
+        return dps.beginTransaction();
     }
 
-    public synchronized void updateGroupStats(List<GroupStats> list) {
-        groupStats.updateStats(list);
+    public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupDescStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
-        meterConfigStats.updateStats(list);
+    public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateMeterStats(List<MeterStats> list) {
-        meterStats.updateStats(list);
+    public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterConfigStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
-        queueStats.updateStats(list);
+    public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
-        flowTableStats.updateStats(list);
+    public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            queueStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        nodeConnectorStats.updateStats(list);
+    public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowTableStats.updateStats(list);
+        }
+    }
+
+    public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            nodeConnectorStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+    public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
+        final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
         if (tableId != null) {
             final DataModificationTransaction trans = dps.beginTransaction();
-
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -153,11 +190,16 @@ public final class NodeStatisticsHandler implements AutoCloseable {
             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
             trans.putOperationalData(tableRef, tableBuilder.build());
 
-            // FIXME: should we be tracking this data?
             trans.commit();
         }
     }
 
+    public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowStats.updateStats(list);
+        }
+    }
+
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
         final DataModificationTransaction trans = dps.beginTransaction();
 
@@ -194,10 +236,6 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         trans.commit();
     }
 
-    public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
-        flowStats.updateStats(list);
-    }
-
     public synchronized void cleanStaleStatistics() {
         final DataModificationTransaction trans = dps.beginTransaction();
         final long now = System.nanoTime();
@@ -209,13 +247,86 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         meterStats.cleanup(trans, now);
         nodeConnectorStats.cleanup(trans, now);
         queueStats.cleanup(trans, now);
+        msgManager.cleanStaleTransactionIds();
 
         trans.commit();
     }
 
+    public synchronized void requestPeriodicStatistics() {
+        logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
+
+        flowTableStats.request();
+
+        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+        //        comes back -- we do not have any tables anyway.
+        final Collection<TableKey> tables = flowTableStats.getTables();
+        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
+        for (final TableKey key : tables) {
+            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
+            flowStats.requestAggregateFlows(key);
+        }
+
+        flowStats.requestAllFlowsAllTables();
+        nodeConnectorStats.request();
+        groupStats.request();
+        groupDescStats.request();
+        meterStats.request();
+        meterConfigStats.request();
+        queueStats.request();
+    }
+
+    public synchronized void start() {
+        flowStats.start(dps);
+        groupDescStats.start(dps);
+        groupStats.start(dps);
+        meterConfigStats.start(dps);
+        meterStats.start(dps);
+        queueStats.start(dps);
+
+        requestPeriodicStatistics();
+    }
+
     @Override
-    public void close() {
-        // FIXME: cleanup any resources we hold (registrations, etc.)
+    public synchronized void close() {
+        flowStats.close();
+        groupDescStats.close();
+        groupStats.close();
+        meterConfigStats.close();
+        meterStats.close();
+        queueStats.close();
+
         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
     }
+
+    @Override
+    public void registerTransaction(final ListenableFuture<TransactionId> future, final StatsRequestType type) {
+        Futures.addCallback(future, new FutureCallback<TransactionId>() {
+            @Override
+            public void onSuccess(TransactionId result) {
+                msgManager.recordExpectedTransaction(result, type);
+                logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
+            }
+        });
+    }
+
+    @Override
+    public void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
+        Futures.addCallback(future, new FutureCallback<TransactionId>() {
+            @Override
+            public void onSuccess(TransactionId result) {
+                msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id);
+                logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
+            }
+        });
+    }
 }