Bug 781: Statistics manager is throwing IllegalStateException
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
index 413c01b1bc132a625b454b9c5e9bd0193d849e4b..5ace260251673bb9fa42b9da14a95a5ece724b4c 100644 (file)
@@ -9,16 +9,16 @@ package org.opendaylight.controller.md.statistics.manager;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
-import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -28,7 +28,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
@@ -36,7 +35,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -59,9 +57,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * This class handles the lifecycle of per-node statistics. It receives data
@@ -72,9 +67,12 @@ import com.google.common.util.concurrent.ListenableFuture;
  */
 public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
+
+    private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
+    private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
-    private final MultipartMessageManager msgManager = new MultipartMessageManager();
+    private final MultipartMessageManager msgManager;
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -87,6 +85,17 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     private final DataProviderService dps;
     private final NodeRef targetNodeRef;
     private final NodeKey targetNodeKey;
+    private final TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+            try{
+                requestPeriodicStatistics();
+                cleanStaleStatistics();
+            }catch(Exception e){
+                logger.warn("Exception occured while sending statistics request : {}",e);
+            }
+        }
+    };
 
     public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
             final OpendaylightFlowStatisticsService flowStatsService,
@@ -100,43 +109,17 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
 
-        final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
-
-        if (flowStatsService != null) {
-            flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
-        } else {
-            flowStats = null;
-        }
-        if (flowTableStatsService != null) {
-            flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
-        } else {
-            flowTableStats = null;
-        }
-
-        if (groupStatsService != null) {
-            groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
-            groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
-        } else {
-            groupDescStats = null;
-            groupStats = null;
-        }
-        if (meterStatsService != null) {
-            meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
-            meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
-        } else {
-            meterConfigStats = null;
-            meterStats = null;
-        }
-        if (portStatsService != null) {
-            nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
-        } else {
-            nodeConnectorStats = null;
-        }
-        if (queueStatsService != null) {
-            queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
-        } else {
-            queueStats = null;
-        }
+        final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+
+        msgManager = new MultipartMessageManager(lifetimeNanos);
+        flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
+        groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
+        groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
+        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
+        meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
+        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
+        queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
     }
 
     public NodeKey getTargetNodeKey() {
@@ -158,50 +141,50 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
         return dps.beginTransaction();
     }
 
-    public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             groupDescStats.updateStats(list);
         }
     }
 
-    public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateGroupStats(TransactionAware transaction, List<GroupStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             groupStats.updateStats(list);
         }
     }
 
-    public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateMeterConfigStats(TransactionAware transaction, List<MeterConfigStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             meterConfigStats.updateStats(list);
         }
     }
 
-    public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateMeterStats(TransactionAware transaction, List<MeterStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             meterStats.updateStats(list);
         }
     }
 
-    public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateQueueStats(TransactionAware transaction, List<QueueIdAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             queueStats.updateStats(list);
         }
     }
 
-    public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateFlowTableStats(TransactionAware transaction, List<FlowTableAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             flowTableStats.updateStats(list);
         }
     }
 
-    public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateNodeConnectorStats(TransactionAware transaction, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             nodeConnectorStats.updateStats(list);
         }
     }
 
-    public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
-        final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
+    public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
+        final Short tableId = msgManager.isExpectedTableTransaction(transaction);
         if (tableId != null) {
             final DataModificationTransaction trans = dps.beginTransaction();
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
@@ -224,8 +207,8 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
         }
     }
 
-    public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateFlowStats(TransactionAware transaction, List<FlowAndStatisticsMapList> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             flowStats.updateStats(list);
         }
     }
@@ -285,114 +268,63 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     public synchronized void requestPeriodicStatistics() {
         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
 
-        if (flowTableStats != null){
-            registerTransaction(flowTableStats.request(), StatsRequestType.ALL_FLOW);
-        }
-        if (flowStats != null){
-            // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
-            //        comes back -- we do not have any tables anyway.
-            final Collection<TableKey> tables = flowTableStats.getTables();
-            logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
-            for (final TableKey key : tables) {
-                logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
-                registerTableTransaction(flowStats.requestAggregateFlows(key),  key.getId());
-            }
+        flowTableStats.request();
 
-            registerTransaction(flowStats.requestAllFlowsAllTables(), StatsRequestType.ALL_FLOW);
+        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+        //        comes back -- we do not have any tables anyway.
+        final Collection<TableKey> tables = flowTableStats.getTables();
+        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
+        for (final TableKey key : tables) {
+            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
+            flowStats.requestAggregateFlows(key);
         }
 
-        if (nodeConnectorStats != null) {
-            registerTransaction(nodeConnectorStats.request(), StatsRequestType.ALL_PORT);
-        }
+        flowStats.requestAllFlowsAllTables();
+        nodeConnectorStats.request();
+        groupStats.request();
+        groupDescStats.request();
+        meterStats.request();
+        meterConfigStats.request();
+        queueStats.request();
+    }
 
-        if (groupStats != null) {
-            registerTransaction(groupStats.request(), StatsRequestType.ALL_GROUP);
-        }
-        sendGroupDescriptionRequest();
+    public synchronized void start(final Timer timer) {
+        flowStats.start(dps);
+        groupDescStats.start(dps);
+        groupStats.start(dps);
+        meterConfigStats.start(dps);
+        meterStats.start(dps);
+        queueStats.start(dps);
 
-        if (meterStats != null) {
-            registerTransaction(meterStats.request(), StatsRequestType.ALL_METER);
-        }
-        sendMeterConfigStatisticsRequest();
+        timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS);
 
-        if(queueStats != null) {
-            registerTransaction(queueStats.request(), StatsRequestType.ALL_QUEUE_STATS);
-        }
-    }
+        logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS);
 
-    public synchronized void start() {
         requestPeriodicStatistics();
     }
 
     @Override
     public synchronized void close() {
-        // FIXME: cleanup any resources we hold (registrations, etc.)
-        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
-    }
-
-    synchronized void sendFlowStatsFromTableRequest(Flow flow) {
-        if (flowStats == null) {
-            logger.debug("No Flow statistics service, not sending a request");
-            return;
-        }
-
-        registerTransaction(flowStats.requestFlow(flow), StatsRequestType.ALL_FLOW);
-    }
-
-    synchronized void sendGroupDescriptionRequest() {
-        if (groupStats == null) {
-            logger.debug("No Group Descriptor statistics service, not sending a request");
-            return;
-        }
-
-        registerTransaction(groupDescStats.request(), StatsRequestType.GROUP_DESC);
-    }
-
-    synchronized void sendMeterConfigStatisticsRequest() {
-        if (meterConfigStats == null) {
-            logger.debug("No Meter Config statistics service, not sending a request");
-            return;
-        }
-
-        registerTransaction(meterConfigStats.request(), StatsRequestType.METER_CONFIG);
-    }
+        task.cancel();
+        flowStats.close();
+        groupDescStats.close();
+        groupStats.close();
+        meterConfigStats.close();
+        meterStats.close();
+        queueStats.close();
 
-    synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) {
-        if (queueStats == null) {
-            logger.debug("No Queue statistics service, not sending a request");
-            return;
-        }
-
-        registerTransaction(queueStats.request(nodeConnectorId, queueId), StatsRequestType.ALL_QUEUE_STATS);
+        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
     }
 
-    private void registerTransaction(final ListenableFuture<TransactionId> future, final StatsRequestType type) {
-        Futures.addCallback(future, new FutureCallback<TransactionId>() {
-            @Override
-            public void onSuccess(TransactionId result) {
-                msgManager.recordExpectedTransaction(result, type);
-                logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
-            }
-        });
+    @Override
+    public void registerTransaction(TransactionId id) {
+        msgManager.recordExpectedTransaction(id);
+        logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey);
     }
 
-    private void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
-        Futures.addCallback(future, new FutureCallback<TransactionId>() {
-            @Override
-            public void onSuccess(TransactionId result) {
-                msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id);
-                logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
-            }
-        });
+    @Override
+    public void registerTableTransaction(final TransactionId id, final Short table) {
+        msgManager.recordExpectedTableTransaction(id, table);
+        logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table);
     }
 }