BUG-2637: migration consequence - fix unit test
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
index 176e52708b0a9a7c8ce91710ad2631285904a283..20341bcc66e0b961f58e7c52b5122b35f38d3976 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.md.statistics.manager.impl;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
@@ -49,6 +51,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
@@ -82,7 +85,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
 
-    private final long maxLifeForRequest = 50; /* 50 second */
     private final int queueCapacity = 5000;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
@@ -97,7 +99,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     private volatile boolean finishing = false;
 
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
-            final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+            final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
         groupStatsService = Preconditions.checkNotNull(
@@ -120,7 +122,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 "OpendaylightQueueStatisticsService can not be null!");
 
         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
-        txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+        /* nr. 7 is here nr. of possible statistic which are waiting for notification
+         *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
                 .maximumSize(10000).build();
     }
 
@@ -163,7 +167,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     @Override
     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
-            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+            final SettableFuture<TransactionId> resultTransId) {
 
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
@@ -171,10 +176,15 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
             @Override
             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
                 final TransactionId id = result.getResult().getTransactionId();
+                final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                 if (id == null) {
-                    LOG.warn("No protocol support");
+                    String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
+                    LOG.warn("Node [{}] does not support statistics request type : {}",
+                            nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
                 } else {
-                    final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
+                    if (resultTransId != null) {
+                        resultTransId.set(id);
+                    }
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
@@ -264,8 +274,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
 
             @Override
@@ -274,16 +285,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllGroupStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getAllGroupStatistics(builder.build()), null, nodeRef);
+                        .getAllGroupStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllGroupStat);
+        return result;
     }
 
     @Override
-    public void getAllMetersStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
 
             @Override
@@ -292,16 +305,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllMeterStat);
+        return result;
     }
 
     @Override
-    public void getAllFlowsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
 
             @Override
@@ -310,11 +325,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllFlowStat);
+        return result;
     }
 
     @Override
@@ -334,7 +350,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 tbuilder.setId(tableId.getValue());
                 tbuilder.setKey(new TableKey(tableId.getValue()));
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
                 return null;
             }
         };
@@ -342,8 +358,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllPortsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
 
             @Override
@@ -351,17 +368,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
                         new GetAllNodeConnectorsStatisticsInputBuilder();
                 builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(portStatsService
-                        .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+                final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+                        portStatsService.getAllNodeConnectorsStatistics(builder.build());
+                registrationRpcFutureCallBack(rpc, null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllPortsStat);
+        return result;
     }
 
     @Override
-    public void getAllTablesStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
 
             @Override
@@ -370,16 +390,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetFlowTablesStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowTableStatsService
-                        .getFlowTablesStatistics(builder.build()), null, nodeRef);
+                        .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllTableStat);
+        return result;
     }
 
     @Override
-    public void getAllQueueStat(final NodeRef nodeRef) {
+    public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
 
             @Override
@@ -388,16 +410,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(queueStatsService
-                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllQueueStat);
+        return result;
     }
 
     @Override
-    public void getAllMeterConfigStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
 
             @Override
@@ -406,11 +430,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterConfigStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(qetAllMeterConfStat);
+        return result;
     }
 
     @Override
@@ -423,7 +448,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -440,7 +465,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -448,8 +473,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsConfStats(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
 
             @Override
@@ -458,12 +484,13 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetGroupDescriptionInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getGroupDescription(builder.build()), null, nodeRef);
+                        .getGroupDescription(builder.build()), null, nodeRef, result);
 
                 return null;
             }
         };
         addGetAllStatJob(getAllGropConfStat);
+        return result;
     }
 
     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {