Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
index 176e52708b0a9a7c8ce91710ad2631285904a283..4870223c0ff4506e01cb9614b4ef209981ab85fd 100644 (file)
@@ -40,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
@@ -82,7 +83,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
 
-    private final long maxLifeForRequest = 50; /* 50 second */
     private final int queueCapacity = 5000;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
@@ -97,7 +97,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     private volatile boolean finishing = false;
 
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
-            final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+            final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
         groupStatsService = Preconditions.checkNotNull(
@@ -120,7 +120,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 "OpendaylightQueueStatisticsService can not be null!");
 
         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
-        txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+        /* nr. 7 is here nr. of possible statistic which are waiting for notification
+         *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
                 .maximumSize(10000).build();
     }
 
@@ -163,7 +165,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     @Override
     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
-            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+            final SettableFuture<TransactionId> resultTransId) {
 
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
@@ -174,6 +177,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 if (id == null) {
                     LOG.warn("No protocol support");
                 } else {
+                    if (resultTransId != null) {
+                        resultTransId.set(id);
+                    }
                     final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
@@ -264,8 +270,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
 
             @Override
@@ -274,16 +281,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllGroupStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getAllGroupStatistics(builder.build()), null, nodeRef);
+                        .getAllGroupStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllGroupStat);
+        return result;
     }
 
     @Override
-    public void getAllMetersStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
 
             @Override
@@ -292,16 +301,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllMeterStat);
+        return result;
     }
 
     @Override
-    public void getAllFlowsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
 
             @Override
@@ -310,11 +321,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllFlowStat);
+        return result;
     }
 
     @Override
@@ -334,7 +346,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 tbuilder.setId(tableId.getValue());
                 tbuilder.setKey(new TableKey(tableId.getValue()));
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
                 return null;
             }
         };
@@ -342,8 +354,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllPortsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
 
             @Override
@@ -351,17 +364,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
                         new GetAllNodeConnectorsStatisticsInputBuilder();
                 builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(portStatsService
-                        .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+                final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+                        portStatsService.getAllNodeConnectorsStatistics(builder.build());
+                registrationRpcFutureCallBack(rpc, null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllPortsStat);
+        return result;
     }
 
     @Override
-    public void getAllTablesStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
 
             @Override
@@ -370,16 +386,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetFlowTablesStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowTableStatsService
-                        .getFlowTablesStatistics(builder.build()), null, nodeRef);
+                        .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllTableStat);
+        return result;
     }
 
     @Override
-    public void getAllQueueStat(final NodeRef nodeRef) {
+    public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
 
             @Override
@@ -388,16 +406,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(queueStatsService
-                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllQueueStat);
+        return result;
     }
 
     @Override
-    public void getAllMeterConfigStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
 
             @Override
@@ -406,11 +426,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterConfigStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(qetAllMeterConfStat);
+        return result;
     }
 
     @Override
@@ -423,7 +444,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -440,7 +461,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -448,8 +469,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsConfStats(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
 
             @Override
@@ -458,12 +480,13 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetGroupDescriptionInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getGroupDescription(builder.build()), null, nodeRef);
+                        .getGroupDescription(builder.build()), null, nodeRef, result);
 
                 return null;
             }
         };
         addGetAllStatJob(getAllGropConfStat);
+        return result;
     }
 
     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {