BUG-2637: migration consequence - fix unit test
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
index 80585548403f2ba2bf1b0e9498e26152acd6dace..20341bcc66e0b961f58e7c52b5122b35f38d3976 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.md.statistics.manager.impl;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -15,10 +16,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
@@ -42,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
@@ -51,6 +51,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
@@ -84,9 +85,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
 
-    private final long maxLifeForRequest = 50; /* 50 second */
     private final int queueCapacity = 5000;
-    private final StatisticsManager manager;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
     private final OpendaylightMeterStatisticsService meterStatsService;
@@ -100,8 +99,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     private volatile boolean finishing = false;
 
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
-            final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
-        this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
+            final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
+        Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
         groupStatsService = Preconditions.checkNotNull(
                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
@@ -123,7 +122,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 "OpendaylightQueueStatisticsService can not be null!");
 
         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
-        txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+        /* nr. 7 is here nr. of possible statistic which are waiting for notification
+         *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
                 .maximumSize(10000).build();
     }
 
@@ -166,7 +167,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     @Override
     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
-            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+            final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+            final SettableFuture<TransactionId> resultTransId) {
 
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
@@ -174,10 +176,15 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
             @Override
             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
                 final TransactionId id = result.getResult().getTransactionId();
+                final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
                 if (id == null) {
-                    LOG.warn("No protocol support");
+                    String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
+                    LOG.warn("Node [{}] does not support statistics request type : {}",
+                            nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
                 } else {
-                    final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
+                    if (resultTransId != null) {
+                        resultTransId.set(id);
+                    }
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
@@ -267,8 +274,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
 
             @Override
@@ -277,16 +285,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllGroupStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getAllGroupStatistics(builder.build()), null, nodeRef);
+                        .getAllGroupStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllGroupStat);
+        return result;
     }
 
     @Override
-    public void getAllMetersStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
 
             @Override
@@ -295,16 +305,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllMeterStat);
+        return result;
     }
 
     @Override
-    public void getAllFlowsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
 
             @Override
@@ -313,44 +325,42 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowStatsService
-                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllFlowStat);
+        return result;
     }
 
     @Override
     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
-
-        manager.enqueue(new StatDataStoreOperation() {
+        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        Preconditions.checkArgument(tableId != null, "TableId can not be null!");
+        final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
 
             @Override
-            public void applyOperation(final ReadWriteTransaction tx) {
-                final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
-                    @Override
-                    public Void call() throws Exception {
-                        final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
-                                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-                        builder.setNode(nodeRef);
-                        builder.setTableId(tableId);
-
-                        final TableBuilder tbuilder = new TableBuilder();
-                        tbuilder.setId(tableId.getValue());
-                        tbuilder.setKey(new TableKey(tableId.getValue()));
-                        registrationRpcFutureCallBack(flowStatsService
-                                .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
-                        return null;
-                    }
-                };
-                addGetAllStatJob(getAggregateFlowStat);
+            public Void call() throws Exception {
+                final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+                builder.setNode(nodeRef);
+                builder.setTableId(tableId);
+
+                final TableBuilder tbuilder = new TableBuilder();
+                tbuilder.setId(tableId.getValue());
+                tbuilder.setKey(new TableKey(tableId.getValue()));
+                registrationRpcFutureCallBack(flowStatsService
+                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
+                return null;
             }
-        });
+        };
+        addGetAllStatJob(getAggregateFlowStat);
     }
 
     @Override
-    public void getAllPortsStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
 
             @Override
@@ -358,17 +368,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
                         new GetAllNodeConnectorsStatisticsInputBuilder();
                 builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(portStatsService
-                        .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+                final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+                        portStatsService.getAllNodeConnectorsStatistics(builder.build());
+                registrationRpcFutureCallBack(rpc, null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllPortsStat);
+        return result;
     }
 
     @Override
-    public void getAllTablesStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
 
             @Override
@@ -377,16 +390,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetFlowTablesStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(flowTableStatsService
-                        .getFlowTablesStatistics(builder.build()), null, nodeRef);
+                        .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllTableStat);
+        return result;
     }
 
     @Override
-    public void getAllQueueStat(final NodeRef nodeRef) {
+    public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
 
             @Override
@@ -395,16 +410,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(queueStatsService
-                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(getAllQueueStat);
+        return result;
     }
 
     @Override
-    public void getAllMeterConfigStat(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
 
             @Override
@@ -413,11 +430,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                         new GetAllMeterConfigStatisticsInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
                 return null;
             }
         };
         addGetAllStatJob(qetAllMeterConfStat);
+        return result;
     }
 
     @Override
@@ -430,7 +448,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -447,7 +465,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 /* RPC input */
                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
                 input.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
                 return null;
             }
         };
@@ -455,23 +473,24 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     }
 
     @Override
-    public void getAllGroupsConfStats(final NodeRef nodeRef) {
+    public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        final SettableFuture<TransactionId> result = SettableFuture.create();
         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
 
             @Override
             public Void call() throws Exception {
-                Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
                 final GetGroupDescriptionInputBuilder builder =
                         new GetGroupDescriptionInputBuilder();
                 builder.setNode(nodeRef);
                 registrationRpcFutureCallBack(groupStatsService
-                        .getGroupDescription(builder.build()), null, nodeRef);
+                        .getGroupDescription(builder.build()), null, nodeRef, result);
 
                 return null;
             }
         };
         addGetAllStatJob(getAllGropConfStat);
+        return result;
     }
 
     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {