Fix bug 2450 - Statistics collection slow - performance 28/13328/2
authorVaclav Demcak <vdemcak@cisco.com>
Tue, 2 Dec 2014 10:36:34 +0000 (11:36 +0100)
committerVaclav Demcak <vaclav.demcak@pantheon.sk>
Wed, 3 Dec 2014 02:24:52 +0000 (02:24 +0000)
* fix timeout value for statWaiter to notification (30 sec is mistake - 3sec is correct value)
* add check TransactionId for every notification (prevent unexpected notification for collecting next statistics)
* timeout has to clear TransactionId (prevention for notification from slower statistics processes

* patch 3  - revert the log level msg (debuging issue in StatPermCollectorImpl)
- change an expiration calculation for cached RPC results (StatRpcMsgManagerImpl)
- fix conditions for call notifyToCollectNextStat (Meter, Group)

succesfull tested for karaf-compatible

Change-Id: I54d7fe9e5c1a5d265c9378507fce1163691b62e5
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
13 files changed:
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatPermCollector.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatRpcMsgManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractNotifyCommit.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitGroup.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitMeter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitPort.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java

index 16ad28dd4fa9f439e201e59a9e66ff205c809d18..94d6dfa6515b76857358c98a2b4d8217ebfc4ecc 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.md.statistics.manager;
 
 import java.util.List;
 
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
@@ -112,7 +113,7 @@ public interface StatPermCollector extends Runnable, AutoCloseable {
      * It is call from collecting allStatistics methods as a future result for
      * Operational/DS statistic store call (does not matter in the outcome).
      */
-    void collectNextStatistics();
+    void collectNextStatistics(TransactionId xid);
 
     /**
      * Method returns true if collector has registered some active nodes
index 0576c2a64541aa6b2c90099cbd299967af74252d..62319ad5946cb561393bb7d1e1517429ade57967 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * statistics-manager
@@ -77,7 +78,8 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param future - result every Device RPC call
      */
-    <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(Future<RpcResult<T>> future, D inputObj, NodeRef ref);
+    <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
+            Future<RpcResult<T>> future, D inputObj, NodeRef ref, SettableFuture<TransactionId> resultTransId);
 
     /**
      * Method adds Notification which is marked as Multipart to the transaction cash
@@ -104,7 +106,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllGroupsStat(NodeRef nodeRef);
+    Future<TransactionId> getAllGroupsStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightGroupStatisticsService.getGroupDescription
@@ -112,7 +114,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllGroupsConfStats(NodeRef nodeRef);
+    Future<TransactionId> getAllGroupsConfStats(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightMeterStatisticsService.getGroupFeatures
@@ -128,7 +130,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllMetersStat(NodeRef nodeRef);
+    Future<TransactionId> getAllMetersStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightMeterStatisticsService.getAllMeterConfigStatistics
@@ -136,7 +138,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllMeterConfigStat(NodeRef nodeRef);
+    Future<TransactionId> getAllMeterConfigStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightMeterStatisticsService.getMeterFeatures
@@ -152,7 +154,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllFlowsStat(NodeRef nodeRef);
+    Future<TransactionId> getAllFlowsStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightFlowStatisticsService.getAggregateFlowStatisticsFromFlowTableForAllFlows
@@ -169,7 +171,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllPortsStat(NodeRef nodeRef);
+    Future<TransactionId> getAllPortsStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightFlowTableStatisticsService.getFlowTablesStatistics
@@ -177,7 +179,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllTablesStat(NodeRef nodeRef);
+    Future<TransactionId> getAllTablesStat(NodeRef nodeRef);
 
     /**
      * Method wraps OpendaylightQueueStatisticsService.getAllQueuesStatisticsFromAllPorts
@@ -185,7 +187,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable {
      *
      * @param NodeRef nodeRef
      */
-    void getAllQueueStat(NodeRef nodeRef);
+    Future<TransactionId> getAllQueueStat(NodeRef nodeRef);
 
 }
 
index 831dc224d113ec364257e57a292d98b24b908cc3..751a68965dc69af4963cf8bd1197cce9590f056d 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.me
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
@@ -97,7 +98,7 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
       *
       * @param nodeIdent
       */
-     void collectNextStatistics(InstanceIdentifier<Node> nodeIdent);
+     void collectNextStatistics(InstanceIdentifier<Node> nodeIdent, TransactionId xid);
 
      /**
       * Method wraps {@link StatPermCollector}.connectedNodeRegistration to provide
index 6bc6a30f8fb806ebe9e3b15473f7160ccba7670d..3f0e5e430e5db9c8c72098241e2d6e6fa513812f 100644 (file)
@@ -84,9 +84,9 @@ public abstract class StatAbstractNotifyCommit<N extends NotificationListener> i
         return manager.isProvidedFlowNodeActive(nodeIdent);
     }
 
-    protected void notifyToCollectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
+    protected void notifyToCollectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
         Preconditions.checkNotNull(nodeIdent, "FlowCapableNode ident can not be null!");
-        manager.collectNextStatistics(nodeIdent);
+        manager.collectNextStatistics(nodeIdent, xid);
     }
 
     /**
index 230425999e3215045ac632a4d291e8cd5507c943..e17c45dc767f5050c4113c4df96b2c6aaf170232 100644 (file)
@@ -216,7 +216,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     }
                 }
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
index 5185ef0b821c51374ac1299e4b6bd295215e8a4f..944ccfab5fd4433c37665037101d86e988a9b56d 100644 (file)
@@ -137,7 +137,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 /* Delete all not presented Group Nodes */
                 deleteAllNotPresentNode(fNodeIdent, tx, Collections.unmodifiableList(existGroupKeys));
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
@@ -247,8 +247,8 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                     }
                     statGroupCommit(((GroupStatisticsUpdated) notif).getGroupStats(), nodeIdent, tx);
                 }
-                if (notifGroup.isPresent()) {
-                    notifyToCollectNextStatistics(nodeIdent);
+                if ( ! notifGroup.isPresent()) {
+                    notifyToCollectNextStatistics(nodeIdent, transId);
                 }
             }
         });
index d6988a6f2b8dac73197afc80b082fab023e4c9a7..2d5be71dcf914e41f0d8b9d61db9d5d7429ad67d 100644 (file)
@@ -137,7 +137,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 /* Delete all not presented Meter Nodes */
                 deleteAllNotPresentedNodes(fNodeIdent, tx, Collections.unmodifiableList(existMeterKeys));
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
@@ -247,8 +247,8 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                     }
                     statMeterCommit(((MeterStatisticsUpdated) notif).getMeterStats(), nodeIdent, tx);
                 }
-                if (notifMeter.isPresent()) {
-                    notifyToCollectNextStatistics(nodeIdent);
+                if ( ! notifMeter.isPresent()) {
+                    notifyToCollectNextStatistics(nodeIdent, transId);
                 }
             }
         });
index f5c5689b7bcbe162178ff44e42c2742ecd3574c6..77d7c7d312345a74451fc45c4c1f972ed23fd9c0 100644 (file)
@@ -137,7 +137,7 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
                 /* Delete all not presented Group Nodes */
                 deleteAllNotPresentedNodes(nodeIdent, tx, Collections.unmodifiableMap(existQueueKeys));
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
index fb124376b6be0875e1396559966e8c3da178a852..131de73f9ded30b75c1d039a9a9d5a9ab22def47 100644 (file)
@@ -105,7 +105,7 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
                 statPortCommit(portStats, nodeIdent, trans);
                 /* Notification for continue collecting statistics - Port statistics are still same size
                  * and they are small - don't need to wait for whole apply operation*/
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
index b41bd4973c99d3391509384edf7a7c49fbbf95cf..53bca87034e263bf0b4b73eb07bb6579dbf63043 100644 (file)
@@ -105,7 +105,7 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
                 statTableCommit(tableStats, nodeIdent, trans);
                 /* Notification for continue collecting statistics - Tables statistics are still same size
                  * and they are small - don't need to wait to whole apply operation */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
index ff1778e8aa9900c0512c6888d8580c92b4c75482..9dd70b5c2837287285feb7b6a54180d0e6866a5f 100644 (file)
@@ -6,12 +6,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
@@ -40,7 +42,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
 
-    private final static long STAT_COLLECT_TIME_OUT = 30000L;
+    private final static long STAT_COLLECT_TIME_OUT = 3000L;
 
     private final ExecutorService statNetCollectorServ;
     private final StatisticsManager manager;
@@ -51,12 +53,14 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final Object statCollectorLock = new Object();
     private final Object statNodeHolderLock = new Object();
+    private final Object transNotifyLock = new Object();
 
     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
 
     private volatile boolean wakeMe = false;
     private volatile boolean finishing = false;
+    private TransactionId actualTransactionId;
 
     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
             final int maxNodeForCollectors) {
@@ -77,7 +81,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
     public void close() {
         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
         finishing = true;
-        collectNextStatistics();
+        collectNextStatistics(actualTransactionId);
         statNetCollectorServ.shutdown();
     }
 
@@ -134,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
                     }
                     if (statNodeHolder.isEmpty()) {
                         finishing = true;
-                        collectNextStatistics();
+                        collectNextStatistics(actualTransactionId);
                         statNetCollectorServ.shutdown();
                     }
                     return true;
@@ -172,12 +176,14 @@ public class StatPermCollectorImpl implements StatPermCollector {
     }
 
     @Override
-    public void collectNextStatistics() {
-        if (wakeMe) {
-            synchronized (statCollectorLock) {
-                if (wakeMe) {
-                    LOG.trace("STAT-COLLECTOR is notified to conntinue");
-                    statCollectorLock.notify();
+    public void collectNextStatistics(final TransactionId xid) {
+        if (checkTransactionId(xid)) {
+            if (wakeMe) {
+                synchronized (statCollectorLock) {
+                    if (wakeMe) {
+                        LOG.trace("STAT-COLLECTOR is notified to conntinue");
+                        statCollectorLock.notify();
+                    }
                 }
             }
         }
@@ -186,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector {
     @Override
     public void run() {
         try {
+            // sleep 5 second before collecting all statistics cycles is important
+            // for loading all Nodes to Operational/DS
             Thread.sleep(5000);
         }
         catch (final InterruptedException e1) {
@@ -234,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
             } catch (final InterruptedException e) {
                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
             } finally {
+                setActualTransactionId(null);
                 wakeMe = false;
             }
         }
@@ -249,49 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector {
                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
                     break;
                 }
-                switch (statMarker) {
-                case PORT_STATS:
-                    LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case QUEUE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case TABLE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case GROUP_STATS:
-                    LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case METER_STATS:
-                    LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case FLOW_STATS:
-                    LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
-                    waitingForNotification();
-                    LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
-                    for (short i = 0; i < maxTables; i++) {
-                        final TableId tableId = new TableId(i);
-                        manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                try {
+                    switch (statMarker) {
+                    case PORT_STATS:
+                        LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case QUEUE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case TABLE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case GROUP_STATS:
+                        LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case METER_STATS:
+                        LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case FLOW_STATS:
+                        LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+                        for (short i = 0; i < maxTables; i++) {
+                            final TableId tableId = new TableId(i);
+                            manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                        }
+                        break;
+                    default:
+                        /* Exception for programmers in implementation cycle */
+                        throw new IllegalStateException("Not implemented ASK for " + statMarker);
                     }
-                    break;
-                default:
-                    /* Exception for programmers in implementation cycle */
-                    throw new IllegalStateException("Not implemented ASK for " + statMarker);
+                } catch (InterruptedException | ExecutionException ex) {
+                    LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+                    continue;
                 }
             }
         }
@@ -333,5 +347,17 @@ public class StatPermCollectorImpl implements StatPermCollector {
         }
         return true;
     }
+
+    private boolean checkTransactionId(final TransactionId xid) {
+        synchronized (transNotifyLock) {
+            return actualTransactionId != null && actualTransactionId.equals(xid);
+        }
+    }
+
+    private void setActualTransactionId(final TransactionId transactionId) {
+        synchronized (transNotifyLock) {
+            actualTransactionId = transactionId;
+        }
+    }
 }
 
index 176e52708b0a9a7c8ce91710ad2631285904a283..4870223c0ff4506e01cb9614b4ef209981ab85fd 100644 (file)
@@ -40,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
@@ -82,7 +83,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
 
-    private final long maxLifeForRequest = 50; /* 50 second */
     private final int queueCapacity = 5000;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
@@ -97,7 +97,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     private volatile boolean finishing = false;
 
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
-            final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+            final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
         groupStatsService = Preconditions.checkNotNull(
@@ -120,7 +120,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 "OpendaylightQueueStatisticsService can not be null!");
 
         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
-        txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+        /* nr. 7 is here nr. of possible statistic which are waiting for notification
+         *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
                 .maximumSize(10000).build();
     }
 
@@ -163,7 +165,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     @Override
     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
-            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+            final SettableFuture<TransactionId> resultTransId) {
 
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
@@ -174,6 +177,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 if (id == null) {
                     LOG.warn("No protocol support");
                 } else {
+                    if (resultTransId != null) {
+                        resultTransId.set(id);
+                    }
                     final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
@@ -264,8 +270,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
 
             @Override
@@ -274,16 +281,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllGroupStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getAllGroupStatistics(builder.build()), null, nodeRef);
+                        .getAllGroupStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllGroupStat);
+        return result;
     }
 
     @Override
-    public void getAllMetersStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
 
             @Override
@@ -292,16 +301,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllMeterStat);
+        return result;
     }
 
     @Override
-    public void getAllFlowsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
 
             @Override
@@ -310,11 +321,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllFlowStat);
+        return result;
     }
 
     @Override
@@ -334,7 +346,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 tbuilder.setId(tableId.getValue());
                 tbuilder.setKey(new TableKey(tableId.getValue()));
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
                 return null;
             }
         };
@@ -342,8 +354,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllPortsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
 
             @Override
@@ -351,17 +364,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
                         new GetAllNodeConnectorsStatisticsInputBuilder();
                 builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(portStatsService
-                        .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+                final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+                        portStatsService.getAllNodeConnectorsStatistics(builder.build());
+                registrationRpcFutureCallBack(rpc, null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllPortsStat);
+        return result;
     }
 
     @Override
-    public void getAllTablesStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
 
             @Override
@@ -370,16 +386,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetFlowTablesStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowTableStatsService
-                        .getFlowTablesStatistics(builder.build()), null, nodeRef);
+                        .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllTableStat);
+        return result;
     }
 
     @Override
-    public void getAllQueueStat(final NodeRef nodeRef) {
+    public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
 
             @Override
@@ -388,16 +406,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(queueStatsService
-                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllQueueStat);
+        return result;
     }
 
     @Override
-    public void getAllMeterConfigStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
 
             @Override
@@ -406,11 +426,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterConfigStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(qetAllMeterConfStat);
+        return result;
     }
 
     @Override
@@ -423,7 +444,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -440,7 +461,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -448,8 +469,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsConfStats(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
 
             @Override
@@ -458,12 +480,13 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetGroupDescriptionInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getGroupDescription(builder.build()), null, nodeRef);
+                        .getGroupDescription(builder.build()), null, nodeRef, result);
 
                 return null;
             }
         };
         addGetAllStatJob(getAllGropConfStat);
+        return result;
     }
 
     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
index edf9fad433f6af8244948fc181dfce97b3d24177..1d03e38c165c0a51e3a257730792389abd4d42a5 100644 (file)
@@ -8,8 +8,15 @@
 
 package org.opendaylight.controller.md.statistics.manager.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -28,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.me
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
@@ -39,14 +47,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
 * statistics-manager
@@ -90,8 +92,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    private final StatisticsManagerConfig statManagerConfig;
 
-   public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
-       this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
+   public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
+       statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
        ThreadFactory threadFact;
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
@@ -105,7 +107,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public void start(final NotificationProviderService notifService,
            final RpcConsumerRegistry rpcRegistry) {
        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-       rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
+       rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
        statCollectors = Collections.emptyList();
        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
@@ -247,10 +249,10 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    }
 
    @Override
-   public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
+   public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
        for (final StatPermCollector collector : statCollectors) {
            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
-               collector.collectNextStatistics();
+               collector.collectNextStatistics(xid);
            }
        }
    }