Statistics-Manager - Performance Improvement 85/5785/5
authorAnil Vishnoi <avishnoi@in.ibm.com>
Tue, 25 Mar 2014 21:49:09 +0000 (03:19 +0530)
committerAnil Vishnoi <avishnoi@in.ibm.com>
Mon, 28 Apr 2014 07:18:28 +0000 (12:48 +0530)
1) Introduced statistics request scheduler, to schedule request based on the current transaction load on MD-SAL DataStore.
Each node submit all individual statistics request to schedular for execution
2) Send statistics request if there is no MD-SAL trasaction pending.
It just keep tracks of the MD-SAL trasaction triggered by statistics-manager
3) Removal of stale statistics is now done based on counter rather then time values.
Time based removal will break in case of clustered environment.
4) Code clean up
Change-Id: Ie7522d0c60f2c7051dbfcdf9a6657843ef4da743
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
13 files changed:
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractListeningStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowTableStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupDescStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterConfigStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeConnectorStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java [new file with mode: 0644]

index 4a58579b13fd542936f876ddad48c7ebaabcb8a5..167fb21ffdc763f6cda840fc42b6b1e6aed03c7f 100644 (file)
@@ -20,8 +20,8 @@ abstract class AbstractListeningStatsTracker<I, K> extends AbstractStatsTracker<
     private static final Logger logger = LoggerFactory.getLogger(AbstractListeningStatsTracker.class);
     private ListenerRegistration<?> reg;
 
-    protected AbstractListeningStatsTracker(FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    protected AbstractListeningStatsTracker(FlowCapableContext context) {
+        super(context);
     }
 
     protected abstract InstanceIdentifier<?> listenPath();
index c29b6a7730ced4f24374d2e4e67a650f3ed69293..e922656d919f5726de7fe19d743458a26f08a531 100644 (file)
@@ -32,6 +32,9 @@ import com.google.common.util.concurrent.JdkFutureAdapters;
 
 abstract class AbstractStatsTracker<I, K> {
     private static final Logger logger = LoggerFactory.getLogger(AbstractStatsTracker.class);
+    
+    private static final int WAIT_FOR_REQUEST_CYCLE = 2;
+    
     private final FutureCallback<RpcResult<? extends TransactionAware>> callback =
             new FutureCallback<RpcResult<? extends TransactionAware>>() {
         @Override
@@ -62,11 +65,11 @@ abstract class AbstractStatsTracker<I, K> {
 
     private final Map<K, Long> trackedItems = new HashMap<>();
     private final FlowCapableContext context;
-    private final long lifetimeNanos;
+    private long requestCounter;
 
-    protected AbstractStatsTracker(final FlowCapableContext context, final long lifetimeNanos) {
+    protected AbstractStatsTracker(final FlowCapableContext context) {
         this.context = Preconditions.checkNotNull(context);
-        this.lifetimeNanos = lifetimeNanos;
+        this.requestCounter = 0;
     }
 
     protected final InstanceIdentifierBuilder<Node> getNodeIdentifierBuilder() {
@@ -89,24 +92,32 @@ abstract class AbstractStatsTracker<I, K> {
         return context.startDataModification();
     }
 
+    public final synchronized void increaseRequestCounter(){
+        this.requestCounter++;
+    }
     protected abstract void cleanupSingleStat(DataModificationTransaction trans, K item);
     protected abstract K updateSingleStat(DataModificationTransaction trans, I item);
+    public abstract void request();
 
     public final synchronized void updateStats(List<I> list) {
-        final Long expiryTime = System.nanoTime() + lifetimeNanos;
+
         final DataModificationTransaction trans = startTransaction();
 
         for (final I item : list) {
-            trackedItems.put(updateSingleStat(trans, item), expiryTime);
+            trackedItems.put(updateSingleStat(trans, item), requestCounter);
         }
 
         trans.commit();
     }
 
-    public final synchronized void cleanup(final DataModificationTransaction trans, long now) {
+    /**
+     * Statistics will be cleaned up if not update in last two request cycles.
+     * @param trans
+     */
+    public final synchronized void cleanup(final DataModificationTransaction trans) {
         for (Iterator<Entry<K, Long>> it = trackedItems.entrySet().iterator();it.hasNext();){
             Entry<K, Long> e = it.next();
-            if (now > e.getValue()) {
+            if (requestCounter >= e.getValue()+WAIT_FOR_REQUEST_CYCLE) {
                 cleanupSingleStat(trans, e.getKey());
                 it.remove();
             }
index 90ddc28acd0066e72e8e134a49c0a786b6f49b4b..06d6e821122617aa1642e3cfd6a5d46808f8518a 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.Collection;
 import java.util.Map.Entry;
 
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
@@ -37,12 +38,17 @@ import org.slf4j.LoggerFactory;
 final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
     private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
     private final OpendaylightFlowStatisticsService flowStatsService;
+    private FlowTableStatsTracker flowTableStats;
     private int unaccountedFlowsCounter = 1;
 
-    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
+        super(context);
         this.flowStatsService = flowStatsService;
     }
+    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, FlowTableStatsTracker flowTableStats) {
+        this(flowStatsService, context);
+        this.flowTableStats = flowTableStats;
+    }
 
     @Override
     protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
@@ -203,6 +209,20 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
         return "Flow";
     }
 
+    @Override
+    public void request() {
+        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+        //        comes back -- we do not have any tables anyway.
+        final Collection<TableKey> tables = flowTableStats.getTables();
+        logger.debug("Node {} supports {} table(s)", this.getNodeRef(), tables.size());
+        for (final TableKey key : tables) {
+            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), this.getNodeRef());
+            this.requestAggregateFlows(key);
+        }
+
+        this.requestAllFlowsAllTables();
+        
+    }
     public void requestAllFlowsAllTables() {
         if (flowStatsService != null) {
             final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
index 3fe68c111a1549bc6c9df7b9b19803590ffad94b..a160f6d467ca6731a8c4d485250d6697cec8b97e 100644 (file)
@@ -30,8 +30,8 @@ final class FlowTableStatsTracker extends AbstractStatsTracker<FlowTableAndStati
     private final Set<TableKey> tables = Collections.unmodifiableSet(privateTables);
     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
 
-    FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context) {
+        super(context);
         this.flowTableStatsService = flowTableStatsService;
     }
 
@@ -61,6 +61,7 @@ final class FlowTableStatsTracker extends AbstractStatsTracker<FlowTableAndStati
         return item;
     }
 
+    @Override
     public void request() {
         if (flowTableStatsService != null) {
             final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
index 8aebd6b2491a00ce27bfb8e1093517029ea87cf3..e180aaf5fc94de8b609bcab6803843c36da2fac5 100644 (file)
@@ -29,8 +29,8 @@ final class GroupDescStatsTracker extends AbstractListeningStatsTracker<GroupDes
     private static final Logger logger = LoggerFactory.getLogger(GroupDescStatsTracker.class);
     private final OpendaylightGroupStatisticsService groupStatsService;
 
-    public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context, final long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context) {
+        super(context);
         this.groupStatsService = groupStatsService;
     }
 
@@ -70,6 +70,7 @@ final class GroupDescStatsTracker extends AbstractListeningStatsTracker<GroupDes
         return "Group Descriptor";
     }
 
+    @Override
     public void request() {
         if (groupStatsService != null) {
             final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
index 1af8e4e9f1a2cf891795d06f967ca0b12307c730..9735fea069325d7f12a87e02563a9afb24205ccf 100644 (file)
@@ -31,8 +31,8 @@ final class GroupStatsTracker extends AbstractListeningStatsTracker<GroupStats,
     private static final Logger logger = LoggerFactory.getLogger(GroupStatsTracker.class);
     private final OpendaylightGroupStatisticsService groupStatsService;
 
-    GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context) {
+        super(context);
         this.groupStatsService = Preconditions.checkNotNull(groupStatsService);
     }
 
@@ -72,6 +72,7 @@ final class GroupStatsTracker extends AbstractListeningStatsTracker<GroupStats,
         return "Group";
     }
 
+    @Override
     public void request() {
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
         input.setNode(getNodeRef());
index 4b9592570565fc94d423702a7cff9923d110895f..e8b7fb7164d6d749a4b2427bc9ce591f4f0a5b54 100644 (file)
@@ -29,8 +29,8 @@ final class MeterConfigStatsTracker extends AbstractListeningStatsTracker<MeterC
     private static final Logger logger = LoggerFactory.getLogger(MeterConfigStatsTracker.class);
     private final OpendaylightMeterStatisticsService meterStatsService;
 
-    protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+        super(context);
         this.meterStatsService = meterStatsService;
     }
 
@@ -62,6 +62,7 @@ final class MeterConfigStatsTracker extends AbstractListeningStatsTracker<MeterC
         return item;
     }
 
+    @Override
     public void request() {
         if (meterStatsService != null) {
             GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
index 091cbcc1a078f88d54cde3eaa9300da97aaed8c2..b373020f1ca26cbd2e08faa7b033c65731344764 100644 (file)
@@ -29,8 +29,8 @@ final class MeterStatsTracker extends AbstractListeningStatsTracker<MeterStats,
     private static final Logger logger = LoggerFactory.getLogger(MeterStatsTracker.class);
     private final OpendaylightMeterStatisticsService meterStatsService;
 
-    MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+        super(context);
         this.meterStatsService = meterStatsService;
     }
 
@@ -61,6 +61,7 @@ final class MeterStatsTracker extends AbstractListeningStatsTracker<MeterStats,
         return item;
     }
 
+    @Override
     public void request() {
         if (meterStatsService != null) {
             GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
index 00bd27402fc3c45c82f0f9d06c8a5a562ea76a97..701911d9a2c307c03ecd48c75bfcd4099ba2490f 100644 (file)
@@ -25,8 +25,8 @@ final class NodeConnectorStatsTracker extends AbstractStatsTracker<NodeConnector
     private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsTracker.class);
     private final OpendaylightPortStatisticsService portStatsService;
 
-    NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context) {
+        super(context);
         this.portStatsService = portStatsService;
     }
 
@@ -73,6 +73,7 @@ final class NodeConnectorStatsTracker extends AbstractStatsTracker<NodeConnector
         return item;
     }
 
+    @Override
     public void request() {
         if (portStatsService != null) {
             final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
index 5ace260251673bb9fa42b9da14a95a5ece724b4c..dbcbab982a9aec997e37a2fb09e763bb3f3c5f96 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -73,6 +72,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
     private final MultipartMessageManager msgManager;
+    private final StatisticsRequestScheduler srScheduler;
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -103,23 +103,25 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
             final OpendaylightGroupStatisticsService groupStatsService,
             final OpendaylightMeterStatisticsService meterStatsService,
             final OpendaylightPortStatisticsService portStatsService,
-            final OpendaylightQueueStatisticsService queueStatsService) {
+            final OpendaylightQueueStatisticsService queueStatsService, 
+            final StatisticsRequestScheduler srScheduler) {
         this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
+        this.srScheduler = Preconditions.checkNotNull(srScheduler);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
 
         final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
 
         msgManager = new MultipartMessageManager(lifetimeNanos);
-        flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
-        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
-        groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
-        groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
-        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
-        meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
-        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
-        queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this);
+        flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats);
+        groupDescStats = new GroupDescStatsTracker(groupStatsService, this);
+        groupStats = new GroupStatsTracker(groupStatsService, this);
+        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this);
+        meterStats = new MeterStatsTracker(meterStatsService, this);
+        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this);
+        queueStats = new QueueStatsTracker(queueStatsService, this);
     }
 
     public NodeKey getTargetNodeKey() {
@@ -138,7 +140,9 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
 
     @Override
     public DataModificationTransaction startDataModification() {
-        return dps.beginTransaction();
+        DataModificationTransaction dmt = dps.beginTransaction();
+        dmt.registerListener(this.srScheduler);
+        return dmt;
     }
 
     public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
@@ -186,7 +190,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
         final Short tableId = msgManager.isExpectedTableTransaction(transaction);
         if (tableId != null) {
-            final DataModificationTransaction trans = dps.beginTransaction();
+            final DataModificationTransaction trans = this.startDataModification();
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -214,7 +218,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -232,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void updateMeterFeatures(MeterFeatures features) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -250,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void cleanStaleStatistics() {
-        final DataModificationTransaction trans = dps.beginTransaction();
-        final long now = System.nanoTime();
-
-        flowStats.cleanup(trans, now);
-        groupDescStats.cleanup(trans, now);
-        groupStats.cleanup(trans, now);
-        meterConfigStats.cleanup(trans, now);
-        meterStats.cleanup(trans, now);
-        nodeConnectorStats.cleanup(trans, now);
-        queueStats.cleanup(trans, now);
+        final DataModificationTransaction trans = this.startDataModification();
+
+        flowStats.cleanup(trans);
+        groupDescStats.cleanup(trans);
+        groupStats.cleanup(trans);
+        meterConfigStats.cleanup(trans);
+        meterStats.cleanup(trans);
+        nodeConnectorStats.cleanup(trans);
+        queueStats.cleanup(trans);
         msgManager.cleanStaleTransactionIds();
 
         trans.commit();
@@ -268,26 +271,23 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     public synchronized void requestPeriodicStatistics() {
         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
 
-        flowTableStats.request();
-
-        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
-        //        comes back -- we do not have any tables anyway.
-        final Collection<TableKey> tables = flowTableStats.getTables();
-        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
-        for (final TableKey key : tables) {
-            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
-            flowStats.requestAggregateFlows(key);
-        }
-
-        flowStats.requestAllFlowsAllTables();
-        nodeConnectorStats.request();
-        groupStats.request();
-        groupDescStats.request();
-        meterStats.request();
-        meterConfigStats.request();
-        queueStats.request();
+        this.srScheduler.addRequestToSchedulerQueue(flowTableStats);
+
+        this.srScheduler.addRequestToSchedulerQueue(flowStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupDescStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterConfigStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(queueStats);
     }
-
+    
     public synchronized void start(final Timer timer) {
         flowStats.start(dps);
         groupDescStats.start(dps);
index f187c7082e6e3b2093e2e0b0654a406dd9315950..6f93eeb6172dcdaf24db8301214abe12d92857e0 100644 (file)
@@ -36,8 +36,8 @@ final class QueueStatsTracker extends AbstractListeningStatsTracker<QueueIdAndSt
     private static final Logger logger = LoggerFactory.getLogger(QueueStatsTracker.class);
     private final OpendaylightQueueStatisticsService queueStatsService;
 
-    QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context) {
+        super(context);
         this.queueStatsService = queueStatsService;
     }
 
@@ -82,6 +82,7 @@ final class QueueStatsTracker extends AbstractListeningStatsTracker<QueueIdAndSt
         return queueEntry;
     }
 
+    @Override
     public void request() {
         if (queueStatsService != null) {
             GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
index 892d304daa8d6cd20c7010082f2698cb8de90266..d8bea7c63a4606fffaed51a9dce5fe889dca2d36 100644 (file)
@@ -66,9 +66,12 @@ public class StatisticsProvider implements AutoCloseable {
     private OpendaylightFlowTableStatisticsService flowTableStatsService;
 
     private OpendaylightQueueStatisticsService queueStatsService;
+    
+    private final StatisticsRequestScheduler srScheduler;
 
     public StatisticsProvider(final DataProviderService dataService) {
         this.dps = Preconditions.checkNotNull(dataService);
+        this.srScheduler = new StatisticsRequestScheduler();
     }
 
     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
@@ -86,7 +89,8 @@ public class StatisticsProvider implements AutoCloseable {
         portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
-
+        this.srScheduler.start();
+        
         // Start receiving notifications
         this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
 
@@ -144,7 +148,7 @@ public class StatisticsProvider implements AutoCloseable {
 
             final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key,
                     flowStatsService, flowTableStatsService, groupStatsService,
-                    meterStatsService, portStatsService, queueStatsService);
+                    meterStatsService, portStatsService, queueStatsService,srScheduler);
             final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
             if (old == null) {
                 spLogger.debug("Started node handler for {}", key.getId());
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java
new file mode 100644 (file)
index 0000000..9ebfd6f
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main responsibility of the class is to check the MD-SAL data store read/write
+ * transaction accumulation level and send statistics request if number of pending 
+ * read/write transactions are zero.
+ * @author avishnoi@in.ibm.com
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class StatisticsRequestScheduler implements DataTransactionListener {
+
+    private static final Logger srsLogger = LoggerFactory.getLogger(StatisticsRequestScheduler.class);
+    private final Timer timer = new Timer("request-monitor", true);
+
+    // We need ordered retrieval, and O(1) contains operation
+    private final Map<AbstractStatsTracker,Integer> requestQueue = 
+            Collections.synchronizedMap(new LinkedHashMap<AbstractStatsTracker,Integer>());
+    
+    private Long PendingTransactions;
+    
+    private long lastRequestTime = System.nanoTime();
+    
+    private static final long REQUEST_MONITOR_INTERVAL = 1000;
+    
+    private final TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+            long now = System.nanoTime();
+            if(now > lastRequestTime+TimeUnit.MILLISECONDS.toNanos(REQUEST_MONITOR_INTERVAL)){
+                requestStatistics();
+            }
+        }
+    };
+
+    public StatisticsRequestScheduler(){
+        PendingTransactions = (long) 0;
+    }
+    
+    public void addRequestToSchedulerQueue(AbstractStatsTracker statsRequest){
+        requestQueue.put(statsRequest, null);
+    }
+    
+    public AbstractStatsTracker getNextRequestFromSchedulerQueue(){
+        //Remove first element
+        AbstractStatsTracker stats = null;
+        synchronized(requestQueue){
+            Iterator<Map.Entry<AbstractStatsTracker, Integer>> nodesItr = requestQueue.entrySet().iterator();
+            if(nodesItr.hasNext()){
+                stats = nodesItr.next().getKey();
+                srsLogger.debug("{} chosen up for execution",stats.getNodeRef());
+                nodesItr.remove();
+                return stats;
+            }
+        }
+        return stats;
+    }
+
+    private void requestStatistics(){
+        AbstractStatsTracker stats = this.getNextRequestFromSchedulerQueue();
+        if(stats != null) {
+            stats.request();
+            stats.increaseRequestCounter();
+        }
+    }
+    @Override
+    public void onStatusUpdated(DataModificationTransaction transaction, TransactionStatus status) {
+        
+        AbstractStatsTracker stats = null;
+        synchronized(PendingTransactions){
+            switch(status){
+            case SUBMITED:
+                this.PendingTransactions++;
+                break;
+            case COMMITED:
+            case FAILED:
+                this.PendingTransactions--;
+                if(PendingTransactions == 0){
+                    lastRequestTime = System.nanoTime();
+                    stats = this.getNextRequestFromSchedulerQueue();
+                }
+                srsLogger.debug("Pending MD-SAL transactions : {} & Scheduler queue size : {}",this.PendingTransactions,this.requestQueue.size());
+                break;
+            default:
+                break;
+            }
+        }
+        if(stats != null){
+            stats.request();
+            stats.increaseRequestCounter();
+        }
+    }
+    
+    public void start(){
+        timer.schedule(task, 0, REQUEST_MONITOR_INTERVAL);
+    }
+}