External api proposal
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
20 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
21 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
63
64
65 /**
66  * statistics-manager
67  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
68  *
69  * StatRpcMsgManagerImpl
70  * Class register and provide all RPC Statistics Device Services and implement pre-defined
71  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
72  *
73  * In next Class implement process for joining multipart messages.
74  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75  * One Weak map is used for holding all Multipart Messages and second is used for possible input
76  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
78  *
79  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
80  *
81  */
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
83
84
85     private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
86
87     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
88
89     private static final int MAX_CACHE_SIZE = 10000;
90     private static final int QUEUE_CAPACITY = 5000;
91
92     private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
93     private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
94     private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
95     /**
96      *  Number of possible statistic which are waiting for notification
97      *      - check it in StatPermCollectorImpl method collectStatCrossNetwork()
98      */
99     private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
100
101     private final OpendaylightGroupStatisticsService groupStatsService;
102     private final OpendaylightMeterStatisticsService meterStatsService;
103     private final OpendaylightFlowStatisticsService flowStatsService;
104     private final OpendaylightPortStatisticsService portStatsService;
105     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
106     private final OpendaylightQueueStatisticsService queueStatsService;
107
108     private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
109
110     private volatile boolean finishing = false;
111
112     public StatRpcMsgManagerImpl (final StatisticsManager manager,
113             final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
114         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
115         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
116         groupStatsService = Preconditions.checkNotNull(
117                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
118                 "OpendaylightGroupStatisticsService can not be null!");
119         meterStatsService = Preconditions.checkNotNull(
120                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
121                 "OpendaylightMeterStatisticsService can not be null!");
122         flowStatsService = Preconditions.checkNotNull(
123                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
124                 "OpendaylightFlowStatisticsService can not be null!");
125         portStatsService = Preconditions.checkNotNull(
126                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
127                 "OpendaylightPortStatisticsService can not be null!");
128         flowTableStatsService = Preconditions.checkNotNull(
129                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
130                 "OpendaylightFlowTableStatisticsService can not be null!");
131         queueStatsService = Preconditions.checkNotNull(
132                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
133                 "OpendaylightQueueStatisticsService can not be null!");
134
135         statsRpcJobQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
136         txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
137                 .maximumSize(MAX_CACHE_SIZE).build();
138     }
139
140     @Override
141     public void close() {
142         finishing = true;
143         statsRpcJobQueue = null;
144     }
145
146     @Override
147     public void run() {
148          /* Neverending cyle - wait for finishing */
149         while ( ! finishing) {
150             try {
151                 statsRpcJobQueue.take().call();
152             }
153             catch (final Exception e) {
154                 LOG.warn("Stat Element RPC executor fail!", e);
155             }
156         }
157         // Drain all rpcCall, making sure any blocked threads are unblocked
158         while ( ! statsRpcJobQueue.isEmpty()) {
159             statsRpcJobQueue.poll();
160         }
161     }
162
163     private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
164         final boolean success = statsRpcJobQueue.offer(getAllStatJob);
165         if ( ! success) {
166             LOG.warn("Put RPC request getAllStat fail! Queue is full.");
167         }
168     }
169
170     private void addStatJob(final RpcJobsQueue getStatJob) {
171         final boolean success = statsRpcJobQueue.offer(getStatJob);
172         if ( ! success) {
173             LOG.debug("Put RPC request for getStat fail! Queue is full.");
174         }
175     }
176
177     @Override
178     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
179             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
180             final SettableFuture<TransactionId> resultTransId) {
181
182         class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
183             @Override
184             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
185                 final TransactionId id = result.getResult().getTransactionId();
186                 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
187                 if (id == null) {
188                     String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
189                     LOG.warn("Node [{}] does not support statistics request type : {}",
190                             nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
191                 } else {
192                     if (resultTransId != null) {
193                         resultTransId.set(id);
194                     }
195                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
196                     final TransactionCacheContainer<? super TransactionAware> container =
197                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
198                     txCache.put(cacheKey, container);
199                 }
200             }
201
202             @Override
203             public void onFailure(final Throwable t) {
204                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
205             }
206
207         }
208
209         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
210     }
211
212     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
213         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
214     }
215
216     @Override
217     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
218             final TransactionId id, final NodeId nodeId) {
219         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
220         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
221
222         final String key = buildCacheKey(id, nodeId);
223         final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
224
225         final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
226
227             @Override
228             public Void call() throws Exception {
229                 final Optional<TransactionCacheContainer<?>> resultContainer =
230                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
231                 if (resultContainer.isPresent()) {
232                     txCache.invalidate(key);
233                 }
234                 result.set(resultContainer);
235                 return null;
236             }
237         };
238         addStatJob(getTransactionCacheContainer);
239         return result;
240     }
241
242     @Override
243     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
244         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
245         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
246
247         final String key = buildCacheKey(id, nodeId);
248         final SettableFuture<Boolean> checkStatId = SettableFuture.create();
249
250         final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
251
252             @Override
253             public Void call() throws Exception {
254                 final Optional<TransactionCacheContainer<?>> result =
255                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
256                 checkStatId.set(Boolean.valueOf(result.isPresent()));
257                 return null;
258             }
259         };
260         addStatJob(isExpecedStatistics);
261         return checkStatId;
262     }
263
264     @Override
265     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
266         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
267         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
268
269         final RpcJobsQueue addNotification = new RpcJobsQueue() {
270
271             @Override
272             public Void call() throws Exception {
273                 final TransactionId txId = notification.getTransactionId();
274                 final String key = buildCacheKey(txId, nodeId);
275                 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
276                 if (container != null) {
277                     container.addNotif(notification);
278                 }
279                 return null;
280             }
281         };
282         addStatJob(addNotification);
283     }
284
285     @Override
286     public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
287         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
288         final SettableFuture<TransactionId> result = SettableFuture.create();
289         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
290
291             @Override
292             public Void call() throws Exception {
293                 final GetAllGroupStatisticsInputBuilder builder =
294                         new GetAllGroupStatisticsInputBuilder();
295                 builder.setNode(nodeRef);
296                 registrationRpcFutureCallBack(groupStatsService
297                         .getAllGroupStatistics(builder.build()), null, nodeRef, result);
298                 return null;
299             }
300         };
301         addGetAllStatJob(getAllGroupStat);
302         return result;
303     }
304
305     @Override
306     public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
307         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
308         final SettableFuture<TransactionId> result = SettableFuture.create();
309         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
310
311             @Override
312             public Void call() throws Exception {
313                 final GetAllMeterStatisticsInputBuilder builder =
314                         new GetAllMeterStatisticsInputBuilder();
315                 builder.setNode(nodeRef);
316                 registrationRpcFutureCallBack(meterStatsService
317                         .getAllMeterStatistics(builder.build()), null, nodeRef, result);
318                 return null;
319             }
320         };
321         addGetAllStatJob(getAllMeterStat);
322         return result;
323     }
324
325     @Override
326     public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
327         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
328         final SettableFuture<TransactionId> result = SettableFuture.create();
329         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
330
331             @Override
332             public Void call() throws Exception {
333                 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
334                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
335                 builder.setNode(nodeRef);
336                 registrationRpcFutureCallBack(flowStatsService
337                         .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
338                 return null;
339             }
340         };
341         addGetAllStatJob(getAllFlowStat);
342         return result;
343     }
344
345     @Override
346     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
347         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
348         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
349         final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
350
351             @Override
352             public Void call() throws Exception {
353                 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
354                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
355                 builder.setNode(nodeRef);
356                 builder.setTableId(tableId);
357
358                 final TableBuilder tbuilder = new TableBuilder();
359                 tbuilder.setId(tableId.getValue());
360                 tbuilder.setKey(new TableKey(tableId.getValue()));
361                 registrationRpcFutureCallBack(flowStatsService
362                         .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
363                 return null;
364             }
365         };
366         addGetAllStatJob(getAggregateFlowStat);
367     }
368
369     @Override
370     public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
371         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
372         final SettableFuture<TransactionId> result = SettableFuture.create();
373         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
374
375             @Override
376             public Void call() throws Exception {
377                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
378                         new GetAllNodeConnectorsStatisticsInputBuilder();
379                 builder.setNode(nodeRef);
380                 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
381                         portStatsService.getAllNodeConnectorsStatistics(builder.build());
382                 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
383                 return null;
384             }
385         };
386         addGetAllStatJob(getAllPortsStat);
387         return result;
388     }
389
390     @Override
391     public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
392         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
393         final SettableFuture<TransactionId> result = SettableFuture.create();
394         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
395
396             @Override
397             public Void call() throws Exception {
398                 final GetFlowTablesStatisticsInputBuilder builder =
399                         new GetFlowTablesStatisticsInputBuilder();
400                 builder.setNode(nodeRef);
401                 registrationRpcFutureCallBack(flowTableStatsService
402                         .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
403                 return null;
404             }
405         };
406         addGetAllStatJob(getAllTableStat);
407         return result;
408     }
409
410     @Override
411     public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
412         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
413         final SettableFuture<TransactionId> result = SettableFuture.create();
414         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
415
416             @Override
417             public Void call() throws Exception {
418                 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
419                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
420                 builder.setNode(nodeRef);
421                 registrationRpcFutureCallBack(queueStatsService
422                         .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
423                 return null;
424             }
425         };
426         addGetAllStatJob(getAllQueueStat);
427         return result;
428     }
429
430     @Override
431     public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
432         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
433         final SettableFuture<TransactionId> result = SettableFuture.create();
434         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
435
436             @Override
437             public Void call() throws Exception {
438                 final GetAllMeterConfigStatisticsInputBuilder builder =
439                         new GetAllMeterConfigStatisticsInputBuilder();
440                 builder.setNode(nodeRef);
441                 registrationRpcFutureCallBack(meterStatsService
442                         .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
443                 return null;
444             }
445         };
446         addGetAllStatJob(qetAllMeterConfStat);
447         return result;
448     }
449
450     @Override
451     public void getGroupFeaturesStat(final NodeRef nodeRef) {
452         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
453         final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
454
455             @Override
456             public Void call() throws Exception {
457                 /* RPC input */
458                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
459                 input.setNode(nodeRef);
460                 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
461                 return null;
462             }
463         };
464         addStatJob(getGroupFeaturesStat);
465     }
466
467     @Override
468     public void getMeterFeaturesStat(final NodeRef nodeRef) {
469         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
470         final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
471
472             @Override
473             public Void call() throws Exception {
474                 /* RPC input */
475                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
476                 input.setNode(nodeRef);
477                 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
478                 return null;
479             }
480         };
481         addStatJob(getMeterFeaturesStat);
482     }
483
484     @Override
485     public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
486         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
487         final SettableFuture<TransactionId> result = SettableFuture.create();
488         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
489
490             @Override
491             public Void call() throws Exception {
492                 final GetGroupDescriptionInputBuilder builder =
493                         new GetGroupDescriptionInputBuilder();
494                 builder.setNode(nodeRef);
495                 registrationRpcFutureCallBack(groupStatsService
496                         .getGroupDescription(builder.build()), null, nodeRef, result);
497
498                 return null;
499             }
500         };
501         addGetAllStatJob(getAllGropConfStat);
502         return result;
503     }
504
505     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
506
507         private final TransactionId id;
508         private final NodeId nId;
509         private final List<T> notifications;
510         private final Optional<? extends DataObject> confInput;
511
512         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
513             this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
514             notifications = new CopyOnWriteArrayList<T>();
515             confInput = Optional.fromNullable(input);
516             nId = nodeId;
517         }
518
519         @Override
520         public void addNotif(final T notif) {
521             notifications.add(notif);
522         }
523
524         @Override
525         public TransactionId getId() {
526             return id;
527         }
528
529         @Override
530         public NodeId getNodeId() {
531             return nId;
532         }
533
534         @Override
535         public List<T> getNotifications() {
536             return notifications;
537         }
538
539         @Override
540         public Optional<? extends DataObject> getConfInput() {
541             return confInput;
542         }
543     }
544 }
545