Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatRpcMsgManagerImpl.java
index 91cd7f164a8ae0be015bb9d3415aa54b9e42e816..0515ca1d1dd9fd97d23110cc8df5fa845f4c001d 100644 (file)
@@ -8,14 +8,14 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
@@ -26,8 +26,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.G
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
@@ -81,11 +81,33 @@ import com.google.common.util.concurrent.SettableFuture;
  */
 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
-    private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
+
+    private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
 
     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
 
-    private final int queueCapacity = 5000;
+    /**
+     * Cache for futures to be returned by
+     * {@link #isExpectedStatistics(TransactionId, NodeId)}.
+     */
+    private final Cache<String, SettableFuture<Boolean>>  txFutureCache;
+
+    /**
+     * The number of seconds to wait for transaction container to be put into
+     * {@link #txCache}.
+     */
+    private static final long TXCACHE_WAIT_TIMEOUT = 10L;
+
+    private static final int MAX_CACHE_SIZE = 10000;
+
+    private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
+    private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
+    private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
+    /**
+     *  Number of possible statistic which are waiting for notification
+     *      - check it in StatPermCollectorImpl method collectStatCrossNetwork()
+     */
+    private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
     private final OpendaylightMeterStatisticsService meterStatsService;
@@ -94,10 +116,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
     private final OpendaylightQueueStatisticsService queueStatsService;
 
-    private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
-
-    private volatile boolean finishing = false;
-
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
             final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
@@ -121,48 +139,11 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
                 "OpendaylightQueueStatisticsService can not be null!");
 
-        statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
-        /* nr. 7 is here nr. of possible statistic which are waiting for notification
-         *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
-        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
-                .maximumSize(10000).build();
-    }
-
-    @Override
-    public void close() {
-        finishing = true;
-        statsRpcJobQueue = null;
-    }
-
-    @Override
-    public void run() {
-         /* Neverending cyle - wait for finishing */
-        while ( ! finishing) {
-            try {
-                statsRpcJobQueue.take().call();
-            }
-            catch (final Exception e) {
-                LOG.warn("Stat Element RPC executor fail!", e);
-            }
-        }
-        // Drain all rpcCall, making sure any blocked threads are unblocked
-        while ( ! statsRpcJobQueue.isEmpty()) {
-            statsRpcJobQueue.poll();
-        }
-    }
-
-    private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
-        final boolean success = statsRpcJobQueue.offer(getAllStatJob);
-        if ( ! success) {
-            LOG.warn("Put RPC request getAllStat fail! Queue is full.");
-        }
-    }
-
-    private void addStatJob(final RpcJobsQueue getStatJob) {
-        final boolean success = statsRpcJobQueue.offer(getStatJob);
-        if ( ! success) {
-            LOG.debug("Put RPC request for getStat fail! Queue is full.");
-        }
+        txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
+                .maximumSize(MAX_CACHE_SIZE).build();
+        txFutureCache = CacheBuilder.newBuilder().
+            expireAfterWrite(TXCACHE_WAIT_TIMEOUT, TimeUnit.SECONDS).
+            maximumSize(MAX_CACHE_SIZE).build();
     }
 
     @Override
@@ -170,9 +151,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
             final SettableFuture<TransactionId> resultTransId) {
 
-        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
-                new FutureCallback<RpcResult<? extends TransactionAware>>() {
-
+        class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
             @Override
             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
                 final TransactionId id = result.getResult().getTransactionId();
@@ -181,6 +160,10 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                     String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
                     LOG.warn("Node [{}] does not support statistics request type : {}",
                             nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
+                    if (resultTransId != null) {
+                        resultTransId.setException(
+                            new UnsupportedOperationException());
+                    }
                 } else {
                     if (resultTransId != null) {
                         resultTransId.set(id);
@@ -188,308 +171,252 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
                     final TransactionCacheContainer<? super TransactionAware> container =
                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
-                    txCache.put(cacheKey, container);
+                    putTransaction(cacheKey, container);
                 }
             }
 
             @Override
             public void onFailure(final Throwable t) {
                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
+                if (resultTransId != null) {
+                    if (t instanceof DOMRpcImplementationNotAvailableException) {
+                        //If encountered with RPC not availabe exception, retry till
+                        // stats manager remove the node from the stats collector pool
+                        resultTransId.set(StatPermCollectorImpl.getFakeTxId());
+                    } else {
+                        resultTransId.setException(t);
+                    }
+                }
             }
+        }
 
-        });
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
     }
 
     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
     }
 
+    /**
+     * Put the given statistics transaction container into the cache.
+     *
+     * @param key        Key that specifies the given transaction container.
+     * @param container  Transaction container.
+     */
+    private synchronized void putTransaction(
+        String key, TransactionCacheContainer<? super TransactionAware> container) {
+        txCache.put(key, container);
+
+        SettableFuture<Boolean> future = txFutureCache.asMap().remove(key);
+        if (future != null) {
+            // Wake up a thread waiting for this transaction container.
+            future.set(true);
+        }
+    }
+
+    /**
+     * Check to see if the specified transaction container is cached in
+     * {@link #txCache}.
+     *
+     * @param key  Key that specifies the transaction container.
+     * @return  A future that will contain the result.
+     */
+    private synchronized Future<Boolean> isExpectedStatistics(String key) {
+        Future<Boolean> future;
+        TransactionCacheContainer<?> container = txCache.getIfPresent(key);
+        if (container == null) {
+            // Wait for the transaction container to be put into the cache.
+            SettableFuture<Boolean> f = SettableFuture.<Boolean>create();
+            SettableFuture<Boolean> current =
+                txFutureCache.asMap().putIfAbsent(key, f);
+            future = (current == null) ? f : current;
+        } else {
+            future = Futures.immediateFuture(Boolean.TRUE);
+        }
+
+        return future;
+    }
+
     @Override
     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
             final TransactionId id, final NodeId nodeId) {
-        Preconditions.checkArgument(id != null, "TransactionId can not be null!");
-        Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
-        final String key = buildCacheKey(id, nodeId);
-        final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
-
-        final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
+        Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
+        Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
+
+        String key = buildCacheKey(id, nodeId);
+        Optional<TransactionCacheContainer<?>> resultContainer =
+            Optional.<TransactionCacheContainer<?>> fromNullable(
+                txCache.asMap().remove(key));
+        if (!resultContainer.isPresent()) {
+            LOG.warn("Transaction cache not found: {}", key);
+        }
 
-            @Override
-            public Void call() throws Exception {
-                final Optional<TransactionCacheContainer<?>> resultContainer =
-                        Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
-                if (resultContainer.isPresent()) {
-                    txCache.invalidate(key);
-                }
-                result.set(resultContainer);
-                return null;
-            }
-        };
-        addStatJob(getTransactionCacheContainer);
-        return result;
+        return Futures.immediateFuture(resultContainer);
     }
 
     @Override
     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
-        Preconditions.checkArgument(id != null, "TransactionId can not be null!");
-        Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
-        final String key = buildCacheKey(id, nodeId);
-        final SettableFuture<Boolean> checkStatId = SettableFuture.create();
+        Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
+        Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
 
-        final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final Optional<TransactionCacheContainer<?>> result =
-                        Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
-                checkStatId.set(Boolean.valueOf(result.isPresent()));
-                return null;
-            }
-        };
-        addStatJob(isExpecedStatistics);
-        return checkStatId;
+        String key = buildCacheKey(id, nodeId);
+        return isExpectedStatistics(key);
     }
 
     @Override
     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
-        Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
-        final RpcJobsQueue addNotification = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final TransactionId txId = notification.getTransactionId();
-                final String key = buildCacheKey(txId, nodeId);
-                final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
-                if (container != null) {
-                    container.addNotif(notification);
-                }
-                return null;
-            }
-        };
-        addStatJob(addNotification);
+        Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
+
+        TransactionId txId = notification.getTransactionId();
+        String key = buildCacheKey(txId, nodeId);
+        TransactionCacheContainer<? super TransactionAware> container =
+            txCache.getIfPresent(key);
+        if (container != null) {
+            container.addNotif(notification);
+        } else {
+            LOG.warn("Unable to add notification: {}, {}", key,
+                     notification.getImplementedInterface());
+        }
     }
 
     @Override
     public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllGroupStatisticsInputBuilder builder =
-                        new GetAllGroupStatisticsInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService
-                        .getAllGroupStatistics(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllGroupStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllGroupStatisticsInputBuilder builder =
+            new GetAllGroupStatisticsInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            groupStatsService.getAllGroupStatistics(builder.build()), null,
+            nodeRef, result);
         return result;
     }
 
     @Override
     public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllMeterStatisticsInputBuilder builder =
-                        new GetAllMeterStatisticsInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterStatistics(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllMeterStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllMeterStatisticsInputBuilder builder =
+            new GetAllMeterStatisticsInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            meterStatsService.getAllMeterStatistics(builder.build()), null,
+            nodeRef, result);
         return result;
     }
 
     @Override
     public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
-                        new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(flowStatsService
-                        .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllFlowStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
+            new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            flowStatsService.getAllFlowsStatisticsFromAllFlowTables(builder.build()),
+            null, nodeRef, result);
         return result;
     }
 
     @Override
     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
-        final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
-                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-                builder.setNode(nodeRef);
-                builder.setTableId(tableId);
-
-                final TableBuilder tbuilder = new TableBuilder();
-                tbuilder.setId(tableId.getValue());
-                tbuilder.setKey(new TableKey(tableId.getValue()));
-                registrationRpcFutureCallBack(flowStatsService
-                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAggregateFlowStat);
+        GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+            new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+        builder.setNode(nodeRef).setTableId(tableId);
+
+        TableBuilder tbuilder = new TableBuilder().
+            setId(tableId.getValue()).
+            setKey(new TableKey(tableId.getValue()));
+        registrationRpcFutureCallBack(
+            flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()),
+            tbuilder.build(), nodeRef, null);
     }
 
     @Override
     public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllNodeConnectorsStatisticsInputBuilder builder =
-                        new GetAllNodeConnectorsStatisticsInputBuilder();
-                builder.setNode(nodeRef);
-                final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
-                        portStatsService.getAllNodeConnectorsStatistics(builder.build());
-                registrationRpcFutureCallBack(rpc, null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllPortsStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllNodeConnectorsStatisticsInputBuilder builder =
+            new GetAllNodeConnectorsStatisticsInputBuilder();
+        builder.setNode(nodeRef);
+        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+            portStatsService.getAllNodeConnectorsStatistics(builder.build());
+        registrationRpcFutureCallBack(rpc, null, nodeRef, result);
         return result;
     }
 
     @Override
     public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetFlowTablesStatisticsInputBuilder builder =
-                        new GetFlowTablesStatisticsInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(flowTableStatsService
-                        .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllTableStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetFlowTablesStatisticsInputBuilder builder =
+            new GetFlowTablesStatisticsInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            flowTableStatsService.getFlowTablesStatistics(builder.build()),
+            null, nodeRef, result);
         return result;
     }
 
     @Override
     public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
-                        new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(queueStatsService
-                        .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllQueueStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
+            new GetAllQueuesStatisticsFromAllPortsInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            queueStatsService.getAllQueuesStatisticsFromAllPorts(builder.build()),
+            null, nodeRef, result);
         return result;
     }
 
     @Override
     public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetAllMeterConfigStatisticsInputBuilder builder =
-                        new GetAllMeterConfigStatisticsInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService
-                        .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
-                return null;
-            }
-        };
-        addGetAllStatJob(qetAllMeterConfStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetAllMeterConfigStatisticsInputBuilder builder =
+            new GetAllMeterConfigStatisticsInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            meterStatsService.getAllMeterConfigStatistics(builder.build()),
+            null, nodeRef, result);
         return result;
     }
 
     @Override
     public void getGroupFeaturesStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                /* RPC input */
-                final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
-                input.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
-                return null;
-            }
-        };
-        addStatJob(getGroupFeaturesStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder().
+            setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            groupStatsService.getGroupFeatures(input.build()), null, nodeRef,
+            null);
     }
 
     @Override
     public void getMeterFeaturesStat(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                /* RPC input */
-                final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
-                input.setNode(nodeRef);
-                registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
-                return null;
-            }
-        };
-        addStatJob(getMeterFeaturesStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder().
+            setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            meterStatsService.getMeterFeatures(input.build()), null, nodeRef,
+            null);
     }
 
     @Override
     public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
-        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
-        final SettableFuture<TransactionId> result = SettableFuture.create();
-        final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
-
-            @Override
-            public Void call() throws Exception {
-                final GetGroupDescriptionInputBuilder builder =
-                        new GetGroupDescriptionInputBuilder();
-                builder.setNode(nodeRef);
-                registrationRpcFutureCallBack(groupStatsService
-                        .getGroupDescription(builder.build()), null, nodeRef, result);
-
-                return null;
-            }
-        };
-        addGetAllStatJob(getAllGropConfStat);
+        Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+        SettableFuture<TransactionId> result = SettableFuture.create();
+        GetGroupDescriptionInputBuilder builder =
+            new GetGroupDescriptionInputBuilder();
+        builder.setNode(nodeRef);
+        registrationRpcFutureCallBack(
+            groupStatsService.getGroupDescription(builder.build()), null,
+            nodeRef, result);
         return result;
     }
 
@@ -501,7 +428,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
         private final Optional<? extends DataObject> confInput;
 
         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
-            this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
+            this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
             notifications = new CopyOnWriteArrayList<T>();
             confInput = Optional.fromNullable(input);
             nId = nodeId;
@@ -533,4 +460,3 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
         }
     }
 }
-