4c3f573ccca6cbadbab4e2213cb1daa9e78d8d66
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import java.math.BigInteger;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19
20 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
21 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
22 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
23 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import com.google.common.base.Joiner;
57 import com.google.common.base.Optional;
58 import com.google.common.base.Preconditions;
59 import com.google.common.cache.Cache;
60 import com.google.common.cache.CacheBuilder;
61 import com.google.common.util.concurrent.FutureCallback;
62 import com.google.common.util.concurrent.Futures;
63 import com.google.common.util.concurrent.JdkFutureAdapters;
64 import com.google.common.util.concurrent.SettableFuture;
65
66
67 /**
68  * statistics-manager
69  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
70  *
71  * StatRpcMsgManagerImpl
72  * Class register and provide all RPC Statistics Device Services and implement pre-defined
73  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
74  *
75  * In next Class implement process for joining multipart messages.
76  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
77  * One Weak map is used for holding all Multipart Messages and second is used for possible input
78  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
79  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
80  *
81  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
82  *
83  */
84 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
85
86
87     private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
88
89     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
90
91     private static final int MAX_CACHE_SIZE = 10000;
92     private static final int QUEUE_CAPACITY = 5000;
93
94     private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
95     private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
96     private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
97     /**
98      *  Number of possible statistic which are waiting for notification
99      *      - check it in StatPermCollectorImpl method collectStatCrossNetwork()
100      */
101     private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
102
103     private final OpendaylightGroupStatisticsService groupStatsService;
104     private final OpendaylightMeterStatisticsService meterStatsService;
105     private final OpendaylightFlowStatisticsService flowStatsService;
106     private final OpendaylightPortStatisticsService portStatsService;
107     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
108     private final OpendaylightQueueStatisticsService queueStatsService;
109
110     private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
111
112     private volatile boolean finishing = false;
113
114     public StatRpcMsgManagerImpl (final StatisticsManager manager,
115             final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
116         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
117         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
118         groupStatsService = Preconditions.checkNotNull(
119                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
120                 "OpendaylightGroupStatisticsService can not be null!");
121         meterStatsService = Preconditions.checkNotNull(
122                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
123                 "OpendaylightMeterStatisticsService can not be null!");
124         flowStatsService = Preconditions.checkNotNull(
125                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
126                 "OpendaylightFlowStatisticsService can not be null!");
127         portStatsService = Preconditions.checkNotNull(
128                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
129                 "OpendaylightPortStatisticsService can not be null!");
130         flowTableStatsService = Preconditions.checkNotNull(
131                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
132                 "OpendaylightFlowTableStatisticsService can not be null!");
133         queueStatsService = Preconditions.checkNotNull(
134                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
135                 "OpendaylightQueueStatisticsService can not be null!");
136
137         statsRpcJobQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
138         txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
139                 .maximumSize(MAX_CACHE_SIZE).build();
140     }
141
142     @Override
143     public void close() {
144         finishing = true;
145         statsRpcJobQueue = null;
146     }
147
148     @Override
149     public void run() {
150          /* Neverending cyle - wait for finishing */
151         while ( ! finishing) {
152             try {
153                 statsRpcJobQueue.take().call();
154             }
155             catch (final Exception e) {
156                 LOG.warn("Stat Element RPC executor fail!", e);
157             }
158         }
159         // Drain all rpcCall, making sure any blocked threads are unblocked
160         while ( ! statsRpcJobQueue.isEmpty()) {
161             statsRpcJobQueue.poll();
162         }
163     }
164
165     private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
166         final boolean success = statsRpcJobQueue.offer(getAllStatJob);
167         if ( ! success) {
168             LOG.warn("Put RPC request getAllStat fail! Queue is full.");
169         }
170     }
171
172     private void addStatJob(final RpcJobsQueue getStatJob) {
173         final boolean success = statsRpcJobQueue.offer(getStatJob);
174         if ( ! success) {
175             LOG.debug("Put RPC request for getStat fail! Queue is full.");
176         }
177     }
178
179     @Override
180     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
181             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
182             final SettableFuture<TransactionId> resultTransId) {
183
184         class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
185             @Override
186             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
187                 final TransactionId id = result.getResult().getTransactionId();
188                 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
189                 if (id == null) {
190                     String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
191                     LOG.warn("Node [{}] does not support statistics request type : {}",
192                             nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
193                     if (resultTransId != null) {
194                         resultTransId.setException(
195                             new UnsupportedOperationException());
196                     }
197                 } else {
198                     if (resultTransId != null) {
199                         resultTransId.set(id);
200                     }
201                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
202                     final TransactionCacheContainer<? super TransactionAware> container =
203                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
204                     txCache.put(cacheKey, container);
205                 }
206             }
207
208             @Override
209             public void onFailure(final Throwable t) {
210                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
211                 if (resultTransId != null) {
212                     if (t instanceof DOMRpcImplementationNotAvailableException) {
213                         //If encountered with RPC not availabe exception, retry till
214                         // stats manager remove the node from the stats collector pool
215                         resultTransId.set(StatPermCollectorImpl.getFakeTxId());
216                     } else {
217                         resultTransId.setException(t);
218                     }
219                 }
220             }
221         }
222
223         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
224     }
225
226     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
227         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
228     }
229
230     @Override
231     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
232             final TransactionId id, final NodeId nodeId) {
233         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
234         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
235
236         final String key = buildCacheKey(id, nodeId);
237         final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
238
239         final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
240
241             @Override
242             public Void call() throws Exception {
243                 final Optional<TransactionCacheContainer<?>> resultContainer =
244                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
245                 if (resultContainer.isPresent()) {
246                     txCache.invalidate(key);
247                 }
248                 result.set(resultContainer);
249                 return null;
250             }
251         };
252         addStatJob(getTransactionCacheContainer);
253         return result;
254     }
255
256     @Override
257     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
258         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
259         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
260
261         final String key = buildCacheKey(id, nodeId);
262         final SettableFuture<Boolean> checkStatId = SettableFuture.create();
263
264         final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
265
266             @Override
267             public Void call() throws Exception {
268                 final Optional<TransactionCacheContainer<?>> result =
269                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
270                 checkStatId.set(Boolean.valueOf(result.isPresent()));
271                 return null;
272             }
273         };
274         addStatJob(isExpecedStatistics);
275         return checkStatId;
276     }
277
278     @Override
279     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
280         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
281         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
282
283         final RpcJobsQueue addNotification = new RpcJobsQueue() {
284
285             @Override
286             public Void call() throws Exception {
287                 final TransactionId txId = notification.getTransactionId();
288                 final String key = buildCacheKey(txId, nodeId);
289                 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
290                 if (container != null) {
291                     container.addNotif(notification);
292                 }
293                 return null;
294             }
295         };
296         addStatJob(addNotification);
297     }
298
299     @Override
300     public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
301         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
302         final SettableFuture<TransactionId> result = SettableFuture.create();
303         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
304
305             @Override
306             public Void call() throws Exception {
307                 final GetAllGroupStatisticsInputBuilder builder =
308                         new GetAllGroupStatisticsInputBuilder();
309                 builder.setNode(nodeRef);
310                 registrationRpcFutureCallBack(groupStatsService
311                         .getAllGroupStatistics(builder.build()), null, nodeRef, result);
312                 return null;
313             }
314         };
315         addGetAllStatJob(getAllGroupStat);
316         return result;
317     }
318
319     @Override
320     public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
321         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
322         final SettableFuture<TransactionId> result = SettableFuture.create();
323         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
324
325             @Override
326             public Void call() throws Exception {
327                 final GetAllMeterStatisticsInputBuilder builder =
328                         new GetAllMeterStatisticsInputBuilder();
329                 builder.setNode(nodeRef);
330                 registrationRpcFutureCallBack(meterStatsService
331                         .getAllMeterStatistics(builder.build()), null, nodeRef, result);
332                 return null;
333             }
334         };
335         addGetAllStatJob(getAllMeterStat);
336         return result;
337     }
338
339     @Override
340     public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
341         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
342         final SettableFuture<TransactionId> result = SettableFuture.create();
343         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
344
345             @Override
346             public Void call() throws Exception {
347                 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
348                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
349                 builder.setNode(nodeRef);
350                 registrationRpcFutureCallBack(flowStatsService
351                         .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
352                 return null;
353             }
354         };
355         addGetAllStatJob(getAllFlowStat);
356         return result;
357     }
358
359     @Override
360     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
361         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
362         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
363         final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
364
365             @Override
366             public Void call() throws Exception {
367                 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
368                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
369                 builder.setNode(nodeRef);
370                 builder.setTableId(tableId);
371
372                 final TableBuilder tbuilder = new TableBuilder();
373                 tbuilder.setId(tableId.getValue());
374                 tbuilder.setKey(new TableKey(tableId.getValue()));
375                 registrationRpcFutureCallBack(flowStatsService
376                         .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
377                 return null;
378             }
379         };
380         addGetAllStatJob(getAggregateFlowStat);
381     }
382
383     @Override
384     public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
385         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
386         final SettableFuture<TransactionId> result = SettableFuture.create();
387         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
388
389             @Override
390             public Void call() throws Exception {
391                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
392                         new GetAllNodeConnectorsStatisticsInputBuilder();
393                 builder.setNode(nodeRef);
394                 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
395                         portStatsService.getAllNodeConnectorsStatistics(builder.build());
396                 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
397                 return null;
398             }
399         };
400         addGetAllStatJob(getAllPortsStat);
401         return result;
402     }
403
404     @Override
405     public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
406         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
407         final SettableFuture<TransactionId> result = SettableFuture.create();
408         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
409
410             @Override
411             public Void call() throws Exception {
412                 final GetFlowTablesStatisticsInputBuilder builder =
413                         new GetFlowTablesStatisticsInputBuilder();
414                 builder.setNode(nodeRef);
415                 registrationRpcFutureCallBack(flowTableStatsService
416                         .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
417                 return null;
418             }
419         };
420         addGetAllStatJob(getAllTableStat);
421         return result;
422     }
423
424     @Override
425     public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
426         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
427         final SettableFuture<TransactionId> result = SettableFuture.create();
428         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
429
430             @Override
431             public Void call() throws Exception {
432                 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
433                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
434                 builder.setNode(nodeRef);
435                 registrationRpcFutureCallBack(queueStatsService
436                         .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
437                 return null;
438             }
439         };
440         addGetAllStatJob(getAllQueueStat);
441         return result;
442     }
443
444     @Override
445     public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
446         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
447         final SettableFuture<TransactionId> result = SettableFuture.create();
448         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
449
450             @Override
451             public Void call() throws Exception {
452                 final GetAllMeterConfigStatisticsInputBuilder builder =
453                         new GetAllMeterConfigStatisticsInputBuilder();
454                 builder.setNode(nodeRef);
455                 registrationRpcFutureCallBack(meterStatsService
456                         .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
457                 return null;
458             }
459         };
460         addGetAllStatJob(qetAllMeterConfStat);
461         return result;
462     }
463
464     @Override
465     public void getGroupFeaturesStat(final NodeRef nodeRef) {
466         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
467         final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
468
469             @Override
470             public Void call() throws Exception {
471                 /* RPC input */
472                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
473                 input.setNode(nodeRef);
474                 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
475                 return null;
476             }
477         };
478         addStatJob(getGroupFeaturesStat);
479     }
480
481     @Override
482     public void getMeterFeaturesStat(final NodeRef nodeRef) {
483         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
484         final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
485
486             @Override
487             public Void call() throws Exception {
488                 /* RPC input */
489                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
490                 input.setNode(nodeRef);
491                 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
492                 return null;
493             }
494         };
495         addStatJob(getMeterFeaturesStat);
496     }
497
498     @Override
499     public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
500         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
501         final SettableFuture<TransactionId> result = SettableFuture.create();
502         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
503
504             @Override
505             public Void call() throws Exception {
506                 final GetGroupDescriptionInputBuilder builder =
507                         new GetGroupDescriptionInputBuilder();
508                 builder.setNode(nodeRef);
509                 registrationRpcFutureCallBack(groupStatsService
510                         .getGroupDescription(builder.build()), null, nodeRef, result);
511
512                 return null;
513             }
514         };
515         addGetAllStatJob(getAllGropConfStat);
516         return result;
517     }
518
519     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
520
521         private final TransactionId id;
522         private final NodeId nId;
523         private final List<T> notifications;
524         private final Optional<? extends DataObject> confInput;
525
526         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
527             this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
528             notifications = new CopyOnWriteArrayList<T>();
529             confInput = Optional.fromNullable(input);
530             nId = nodeId;
531         }
532
533         @Override
534         public void addNotif(final T notif) {
535             notifications.add(notif);
536         }
537
538         @Override
539         public TransactionId getId() {
540             return id;
541         }
542
543         @Override
544         public NodeId getNodeId() {
545             return nId;
546         }
547
548         @Override
549         public List<T> getNotifications() {
550             return notifications;
551         }
552
553         @Override
554         public Optional<? extends DataObject> getConfInput() {
555             return confInput;
556         }
557     }
558 }
559