Statistics-Manager - Performance Improvement 85/5785/5
authorAnil Vishnoi <avishnoi@in.ibm.com>
Tue, 25 Mar 2014 21:49:09 +0000 (03:19 +0530)
committerAnil Vishnoi <avishnoi@in.ibm.com>
Mon, 28 Apr 2014 07:18:28 +0000 (12:48 +0530)
1) Introduced statistics request scheduler, to schedule request based on the current transaction load on MD-SAL DataStore.
Each node submit all individual statistics request to schedular for execution
2) Send statistics request if there is no MD-SAL trasaction pending.
It just keep tracks of the MD-SAL trasaction triggered by statistics-manager
3) Removal of stale statistics is now done based on counter rather then time values.
Time based removal will break in case of clustered environment.
4) Code clean up
Change-Id: Ie7522d0c60f2c7051dbfcdf9a6657843ef4da743
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
13 files changed:
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractListeningStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowTableStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupDescStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterConfigStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeConnectorStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java [new file with mode: 0644]

index 4a58579..167fb21 100644 (file)
@@ -20,8 +20,8 @@ abstract class AbstractListeningStatsTracker<I, K> extends AbstractStatsTracker<
     private static final Logger logger = LoggerFactory.getLogger(AbstractListeningStatsTracker.class);
     private ListenerRegistration<?> reg;
 
-    protected AbstractListeningStatsTracker(FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    protected AbstractListeningStatsTracker(FlowCapableContext context) {
+        super(context);
     }
 
     protected abstract InstanceIdentifier<?> listenPath();
index c29b6a7..e922656 100644 (file)
@@ -32,6 +32,9 @@ import com.google.common.util.concurrent.JdkFutureAdapters;
 
 abstract class AbstractStatsTracker<I, K> {
     private static final Logger logger = LoggerFactory.getLogger(AbstractStatsTracker.class);
+    
+    private static final int WAIT_FOR_REQUEST_CYCLE = 2;
+    
     private final FutureCallback<RpcResult<? extends TransactionAware>> callback =
             new FutureCallback<RpcResult<? extends TransactionAware>>() {
         @Override
@@ -62,11 +65,11 @@ abstract class AbstractStatsTracker<I, K> {
 
     private final Map<K, Long> trackedItems = new HashMap<>();
     private final FlowCapableContext context;
-    private final long lifetimeNanos;
+    private long requestCounter;
 
-    protected AbstractStatsTracker(final FlowCapableContext context, final long lifetimeNanos) {
+    protected AbstractStatsTracker(final FlowCapableContext context) {
         this.context = Preconditions.checkNotNull(context);
-        this.lifetimeNanos = lifetimeNanos;
+        this.requestCounter = 0;
     }
 
     protected final InstanceIdentifierBuilder<Node> getNodeIdentifierBuilder() {
@@ -89,24 +92,32 @@ abstract class AbstractStatsTracker<I, K> {
         return context.startDataModification();
     }
 
+    public final synchronized void increaseRequestCounter(){
+        this.requestCounter++;
+    }
     protected abstract void cleanupSingleStat(DataModificationTransaction trans, K item);
     protected abstract K updateSingleStat(DataModificationTransaction trans, I item);
+    public abstract void request();
 
     public final synchronized void updateStats(List<I> list) {
-        final Long expiryTime = System.nanoTime() + lifetimeNanos;
+
         final DataModificationTransaction trans = startTransaction();
 
         for (final I item : list) {
-            trackedItems.put(updateSingleStat(trans, item), expiryTime);
+            trackedItems.put(updateSingleStat(trans, item), requestCounter);
         }
 
         trans.commit();
     }
 
-    public final synchronized void cleanup(final DataModificationTransaction trans, long now) {
+    /**
+     * Statistics will be cleaned up if not update in last two request cycles.
+     * @param trans
+     */
+    public final synchronized void cleanup(final DataModificationTransaction trans) {
         for (Iterator<Entry<K, Long>> it = trackedItems.entrySet().iterator();it.hasNext();){
             Entry<K, Long> e = it.next();
-            if (now > e.getValue()) {
+            if (requestCounter >= e.getValue()+WAIT_FOR_REQUEST_CYCLE) {
                 cleanupSingleStat(trans, e.getKey());
                 it.remove();
             }
index 90ddc28..06d6e82 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.Collection;
 import java.util.Map.Entry;
 
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
@@ -37,12 +38,17 @@ import org.slf4j.LoggerFactory;
 final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
     private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
     private final OpendaylightFlowStatisticsService flowStatsService;
+    private FlowTableStatsTracker flowTableStats;
     private int unaccountedFlowsCounter = 1;
 
-    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
+        super(context);
         this.flowStatsService = flowStatsService;
     }
+    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, FlowTableStatsTracker flowTableStats) {
+        this(flowStatsService, context);
+        this.flowTableStats = flowTableStats;
+    }
 
     @Override
     protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
@@ -203,6 +209,20 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
         return "Flow";
     }
 
+    @Override
+    public void request() {
+        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+        //        comes back -- we do not have any tables anyway.
+        final Collection<TableKey> tables = flowTableStats.getTables();
+        logger.debug("Node {} supports {} table(s)", this.getNodeRef(), tables.size());
+        for (final TableKey key : tables) {
+            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), this.getNodeRef());
+            this.requestAggregateFlows(key);
+        }
+
+        this.requestAllFlowsAllTables();
+        
+    }
     public void requestAllFlowsAllTables() {
         if (flowStatsService != null) {
             final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
index 3fe68c1..a160f6d 100644 (file)
@@ -30,8 +30,8 @@ final class FlowTableStatsTracker extends AbstractStatsTracker<FlowTableAndStati
     private final Set<TableKey> tables = Collections.unmodifiableSet(privateTables);
     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
 
-    FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context) {
+        super(context);
         this.flowTableStatsService = flowTableStatsService;
     }
 
@@ -61,6 +61,7 @@ final class FlowTableStatsTracker extends AbstractStatsTracker<FlowTableAndStati
         return item;
     }
 
+    @Override
     public void request() {
         if (flowTableStatsService != null) {
             final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
index 8aebd6b..e180aaf 100644 (file)
@@ -29,8 +29,8 @@ final class GroupDescStatsTracker extends AbstractListeningStatsTracker<GroupDes
     private static final Logger logger = LoggerFactory.getLogger(GroupDescStatsTracker.class);
     private final OpendaylightGroupStatisticsService groupStatsService;
 
-    public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context, final long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context) {
+        super(context);
         this.groupStatsService = groupStatsService;
     }
 
@@ -70,6 +70,7 @@ final class GroupDescStatsTracker extends AbstractListeningStatsTracker<GroupDes
         return "Group Descriptor";
     }
 
+    @Override
     public void request() {
         if (groupStatsService != null) {
             final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
index 1af8e4e..9735fea 100644 (file)
@@ -31,8 +31,8 @@ final class GroupStatsTracker extends AbstractListeningStatsTracker<GroupStats,
     private static final Logger logger = LoggerFactory.getLogger(GroupStatsTracker.class);
     private final OpendaylightGroupStatisticsService groupStatsService;
 
-    GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context) {
+        super(context);
         this.groupStatsService = Preconditions.checkNotNull(groupStatsService);
     }
 
@@ -72,6 +72,7 @@ final class GroupStatsTracker extends AbstractListeningStatsTracker<GroupStats,
         return "Group";
     }
 
+    @Override
     public void request() {
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
         input.setNode(getNodeRef());
index 4b95925..e8b7fb7 100644 (file)
@@ -29,8 +29,8 @@ final class MeterConfigStatsTracker extends AbstractListeningStatsTracker<MeterC
     private static final Logger logger = LoggerFactory.getLogger(MeterConfigStatsTracker.class);
     private final OpendaylightMeterStatisticsService meterStatsService;
 
-    protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+        super(context);
         this.meterStatsService = meterStatsService;
     }
 
@@ -62,6 +62,7 @@ final class MeterConfigStatsTracker extends AbstractListeningStatsTracker<MeterC
         return item;
     }
 
+    @Override
     public void request() {
         if (meterStatsService != null) {
             GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
index 091cbcc..b373020 100644 (file)
@@ -29,8 +29,8 @@ final class MeterStatsTracker extends AbstractListeningStatsTracker<MeterStats,
     private static final Logger logger = LoggerFactory.getLogger(MeterStatsTracker.class);
     private final OpendaylightMeterStatisticsService meterStatsService;
 
-    MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+        super(context);
         this.meterStatsService = meterStatsService;
     }
 
@@ -61,6 +61,7 @@ final class MeterStatsTracker extends AbstractListeningStatsTracker<MeterStats,
         return item;
     }
 
+    @Override
     public void request() {
         if (meterStatsService != null) {
             GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
index 00bd274..701911d 100644 (file)
@@ -25,8 +25,8 @@ final class NodeConnectorStatsTracker extends AbstractStatsTracker<NodeConnector
     private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsTracker.class);
     private final OpendaylightPortStatisticsService portStatsService;
 
-    NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context) {
+        super(context);
         this.portStatsService = portStatsService;
     }
 
@@ -73,6 +73,7 @@ final class NodeConnectorStatsTracker extends AbstractStatsTracker<NodeConnector
         return item;
     }
 
+    @Override
     public void request() {
         if (portStatsService != null) {
             final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
index 5ace260..dbcbab9 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -73,6 +72,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
     private final MultipartMessageManager msgManager;
+    private final StatisticsRequestScheduler srScheduler;
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -103,23 +103,25 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
             final OpendaylightGroupStatisticsService groupStatsService,
             final OpendaylightMeterStatisticsService meterStatsService,
             final OpendaylightPortStatisticsService portStatsService,
-            final OpendaylightQueueStatisticsService queueStatsService) {
+            final OpendaylightQueueStatisticsService queueStatsService, 
+            final StatisticsRequestScheduler srScheduler) {
         this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
+        this.srScheduler = Preconditions.checkNotNull(srScheduler);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
 
         final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
 
         msgManager = new MultipartMessageManager(lifetimeNanos);
-        flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
-        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
-        groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
-        groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
-        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
-        meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
-        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
-        queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this);
+        flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats);
+        groupDescStats = new GroupDescStatsTracker(groupStatsService, this);
+        groupStats = new GroupStatsTracker(groupStatsService, this);
+        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this);
+        meterStats = new MeterStatsTracker(meterStatsService, this);
+        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this);
+        queueStats = new QueueStatsTracker(queueStatsService, this);
     }
 
     public NodeKey getTargetNodeKey() {
@@ -138,7 +140,9 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
 
     @Override
     public DataModificationTransaction startDataModification() {
-        return dps.beginTransaction();
+        DataModificationTransaction dmt = dps.beginTransaction();
+        dmt.registerListener(this.srScheduler);
+        return dmt;
     }
 
     public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
@@ -186,7 +190,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
         final Short tableId = msgManager.isExpectedTableTransaction(transaction);
         if (tableId != null) {
-            final DataModificationTransaction trans = dps.beginTransaction();
+            final DataModificationTransaction trans = this.startDataModification();
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -214,7 +218,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -232,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void updateMeterFeatures(MeterFeatures features) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -250,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     }
 
     public synchronized void cleanStaleStatistics() {
-        final DataModificationTransaction trans = dps.beginTransaction();
-        final long now = System.nanoTime();
-
-        flowStats.cleanup(trans, now);
-        groupDescStats.cleanup(trans, now);
-        groupStats.cleanup(trans, now);
-        meterConfigStats.cleanup(trans, now);
-        meterStats.cleanup(trans, now);
-        nodeConnectorStats.cleanup(trans, now);
-        queueStats.cleanup(trans, now);
+        final DataModificationTransaction trans = this.startDataModification();
+
+        flowStats.cleanup(trans);
+        groupDescStats.cleanup(trans);
+        groupStats.cleanup(trans);
+        meterConfigStats.cleanup(trans);
+        meterStats.cleanup(trans);
+        nodeConnectorStats.cleanup(trans);
+        queueStats.cleanup(trans);
         msgManager.cleanStaleTransactionIds();
 
         trans.commit();
@@ -268,26 +271,23 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo
     public synchronized void requestPeriodicStatistics() {
         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
 
-        flowTableStats.request();
-
-        // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
-        //        comes back -- we do not have any tables anyway.
-        final Collection<TableKey> tables = flowTableStats.getTables();
-        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
-        for (final TableKey key : tables) {
-            logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
-            flowStats.requestAggregateFlows(key);
-        }
-
-        flowStats.requestAllFlowsAllTables();
-        nodeConnectorStats.request();
-        groupStats.request();
-        groupDescStats.request();
-        meterStats.request();
-        meterConfigStats.request();
-        queueStats.request();
+        this.srScheduler.addRequestToSchedulerQueue(flowTableStats);
+
+        this.srScheduler.addRequestToSchedulerQueue(flowStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupDescStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterConfigStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(queueStats);
     }
-
+    
     public synchronized void start(final Timer timer) {
         flowStats.start(dps);
         groupDescStats.start(dps);
index f187c70..6f93eeb 100644 (file)
@@ -36,8 +36,8 @@ final class QueueStatsTracker extends AbstractListeningStatsTracker<QueueIdAndSt
     private static final Logger logger = LoggerFactory.getLogger(QueueStatsTracker.class);
     private final OpendaylightQueueStatisticsService queueStatsService;
 
-    QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context, long lifetimeNanos) {
-        super(context, lifetimeNanos);
+    QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context) {
+        super(context);
         this.queueStatsService = queueStatsService;
     }
 
@@ -82,6 +82,7 @@ final class QueueStatsTracker extends AbstractListeningStatsTracker<QueueIdAndSt
         return queueEntry;
     }
 
+    @Override
     public void request() {
         if (queueStatsService != null) {
             GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
index 892d304..d8bea7c 100644 (file)
@@ -66,9 +66,12 @@ public class StatisticsProvider implements AutoCloseable {
     private OpendaylightFlowTableStatisticsService flowTableStatsService;
 
     private OpendaylightQueueStatisticsService queueStatsService;
+    
+    private final StatisticsRequestScheduler srScheduler;
 
     public StatisticsProvider(final DataProviderService dataService) {
         this.dps = Preconditions.checkNotNull(dataService);
+        this.srScheduler = new StatisticsRequestScheduler();
     }
 
     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
@@ -86,7 +89,8 @@ public class StatisticsProvider implements AutoCloseable {
         portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
-
+        this.srScheduler.start();
+        
         // Start receiving notifications
         this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
 
@@ -144,7 +148,7 @@ public class StatisticsProvider implements AutoCloseable {
 
             final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key,
                     flowStatsService, flowTableStatsService, groupStatsService,
-                    meterStatsService, portStatsService, queueStatsService);
+                    meterStatsService, portStatsService, queueStatsService,srScheduler);
             final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
             if (old == null) {
                 spLogger.debug("Started node handler for {}", key.getId());
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsRequestScheduler.java
new file mode 100644 (file)
index 0000000..9ebfd6f
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main responsibility of the class is to check the MD-SAL data store read/write
+ * transaction accumulation level and send statistics request if number of pending 
+ * read/write transactions are zero.
+ * @author avishnoi@in.ibm.com
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class StatisticsRequestScheduler implements DataTransactionListener {
+
+    private static final Logger srsLogger = LoggerFactory.getLogger(StatisticsRequestScheduler.class);
+    private final Timer timer = new Timer("request-monitor", true);
+
+    // We need ordered retrieval, and O(1) contains operation
+    private final Map<AbstractStatsTracker,Integer> requestQueue = 
+            Collections.synchronizedMap(new LinkedHashMap<AbstractStatsTracker,Integer>());
+    
+    private Long PendingTransactions;
+    
+    private long lastRequestTime = System.nanoTime();
+    
+    private static final long REQUEST_MONITOR_INTERVAL = 1000;
+    
+    private final TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+            long now = System.nanoTime();
+            if(now > lastRequestTime+TimeUnit.MILLISECONDS.toNanos(REQUEST_MONITOR_INTERVAL)){
+                requestStatistics();
+            }
+        }
+    };
+
+    public StatisticsRequestScheduler(){
+        PendingTransactions = (long) 0;
+    }
+    
+    public void addRequestToSchedulerQueue(AbstractStatsTracker statsRequest){
+        requestQueue.put(statsRequest, null);
+    }
+    
+    public AbstractStatsTracker getNextRequestFromSchedulerQueue(){
+        //Remove first element
+        AbstractStatsTracker stats = null;
+        synchronized(requestQueue){
+            Iterator<Map.Entry<AbstractStatsTracker, Integer>> nodesItr = requestQueue.entrySet().iterator();
+            if(nodesItr.hasNext()){
+                stats = nodesItr.next().getKey();
+                srsLogger.debug("{} chosen up for execution",stats.getNodeRef());
+                nodesItr.remove();
+                return stats;
+            }
+        }
+        return stats;
+    }
+
+    private void requestStatistics(){
+        AbstractStatsTracker stats = this.getNextRequestFromSchedulerQueue();
+        if(stats != null) {
+            stats.request();
+            stats.increaseRequestCounter();
+        }
+    }
+    @Override
+    public void onStatusUpdated(DataModificationTransaction transaction, TransactionStatus status) {
+        
+        AbstractStatsTracker stats = null;
+        synchronized(PendingTransactions){
+            switch(status){
+            case SUBMITED:
+                this.PendingTransactions++;
+                break;
+            case COMMITED:
+            case FAILED:
+                this.PendingTransactions--;
+                if(PendingTransactions == 0){
+                    lastRequestTime = System.nanoTime();
+                    stats = this.getNextRequestFromSchedulerQueue();
+                }
+                srsLogger.debug("Pending MD-SAL transactions : {} & Scheduler queue size : {}",this.PendingTransactions,this.requestQueue.size());
+                break;
+            default:
+                break;
+            }
+        }
+        if(stats != null){
+            stats.request();
+            stats.increaseRequestCounter();
+        }
+    }
+    
+    public void start(){
+        timer.schedule(task, 0, REQUEST_MONITOR_INTERVAL);
+    }
+}