Bug 2552 - Fix statistics manager log messages
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
20 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
21 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
63
64
65 /**
66  * statistics-manager
67  * org.opendaylight.controller.md.statistics.manager.impl
68  *
69  * StatRpcMsgManagerImpl
70  * Class register and provide all RPC Statistics Device Services and implement pre-defined
71  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
72  *
73  * In next Class implement process for joining multipart messages.
74  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75  * One Weak map is used for holding all Multipart Messages and second is used for possible input
76  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
78  *
79  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
80  *
81  */
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
83
84     private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
85
86     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
87
88     private final int queueCapacity = 5000;
89
90     private final OpendaylightGroupStatisticsService groupStatsService;
91     private final OpendaylightMeterStatisticsService meterStatsService;
92     private final OpendaylightFlowStatisticsService flowStatsService;
93     private final OpendaylightPortStatisticsService portStatsService;
94     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
95     private final OpendaylightQueueStatisticsService queueStatsService;
96
97     private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
98
99     private volatile boolean finishing = false;
100
101     public StatRpcMsgManagerImpl (final StatisticsManager manager,
102             final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
103         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
104         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
105         groupStatsService = Preconditions.checkNotNull(
106                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
107                 "OpendaylightGroupStatisticsService can not be null!");
108         meterStatsService = Preconditions.checkNotNull(
109                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
110                 "OpendaylightMeterStatisticsService can not be null!");
111         flowStatsService = Preconditions.checkNotNull(
112                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
113                 "OpendaylightFlowStatisticsService can not be null!");
114         portStatsService = Preconditions.checkNotNull(
115                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
116                 "OpendaylightPortStatisticsService can not be null!");
117         flowTableStatsService = Preconditions.checkNotNull(
118                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
119                 "OpendaylightFlowTableStatisticsService can not be null!");
120         queueStatsService = Preconditions.checkNotNull(
121                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
122                 "OpendaylightQueueStatisticsService can not be null!");
123
124         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
125         /* nr. 7 is here nr. of possible statistic which are waiting for notification
126          *      - check it in StatPermCollectorImpl method collectStatCrossNetwork */
127         txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
128                 .maximumSize(10000).build();
129     }
130
131     @Override
132     public void close() {
133         finishing = true;
134         statsRpcJobQueue = null;
135     }
136
137     @Override
138     public void run() {
139          /* Neverending cyle - wait for finishing */
140         while ( ! finishing) {
141             try {
142                 statsRpcJobQueue.take().call();
143             }
144             catch (final Exception e) {
145                 LOG.warn("Stat Element RPC executor fail!", e);
146             }
147         }
148         // Drain all rpcCall, making sure any blocked threads are unblocked
149         while ( ! statsRpcJobQueue.isEmpty()) {
150             statsRpcJobQueue.poll();
151         }
152     }
153
154     private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
155         final boolean success = statsRpcJobQueue.offer(getAllStatJob);
156         if ( ! success) {
157             LOG.warn("Put RPC request getAllStat fail! Queue is full.");
158         }
159     }
160
161     private void addStatJob(final RpcJobsQueue getStatJob) {
162         final boolean success = statsRpcJobQueue.offer(getStatJob);
163         if ( ! success) {
164             LOG.debug("Put RPC request for getStat fail! Queue is full.");
165         }
166     }
167
168     @Override
169     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
170             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
171             final SettableFuture<TransactionId> resultTransId) {
172
173         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
174                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
175
176             @Override
177             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
178                 final TransactionId id = result.getResult().getTransactionId();
179                 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
180                 if (id == null) {
181                     String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
182                     LOG.warn("Node [{}] does not support statistics request type : {}",
183                             nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
184                 } else {
185                     if (resultTransId != null) {
186                         resultTransId.set(id);
187                     }
188                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
189                     final TransactionCacheContainer<? super TransactionAware> container =
190                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
191                     txCache.put(cacheKey, container);
192                 }
193             }
194
195             @Override
196             public void onFailure(final Throwable t) {
197                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
198             }
199
200         });
201     }
202
203     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
204         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
205     }
206
207     @Override
208     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
209             final TransactionId id, final NodeId nodeId) {
210         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
211         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
212
213         final String key = buildCacheKey(id, nodeId);
214         final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
215
216         final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
217
218             @Override
219             public Void call() throws Exception {
220                 final Optional<TransactionCacheContainer<?>> resultContainer =
221                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
222                 if (resultContainer.isPresent()) {
223                     txCache.invalidate(key);
224                 }
225                 result.set(resultContainer);
226                 return null;
227             }
228         };
229         addStatJob(getTransactionCacheContainer);
230         return result;
231     }
232
233     @Override
234     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
235         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
236         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
237
238         final String key = buildCacheKey(id, nodeId);
239         final SettableFuture<Boolean> checkStatId = SettableFuture.create();
240
241         final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
242
243             @Override
244             public Void call() throws Exception {
245                 final Optional<TransactionCacheContainer<?>> result =
246                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
247                 checkStatId.set(Boolean.valueOf(result.isPresent()));
248                 return null;
249             }
250         };
251         addStatJob(isExpecedStatistics);
252         return checkStatId;
253     }
254
255     @Override
256     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
257         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
258         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
259
260         final RpcJobsQueue addNotification = new RpcJobsQueue() {
261
262             @Override
263             public Void call() throws Exception {
264                 final TransactionId txId = notification.getTransactionId();
265                 final String key = buildCacheKey(txId, nodeId);
266                 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
267                 if (container != null) {
268                     container.addNotif(notification);
269                 }
270                 return null;
271             }
272         };
273         addStatJob(addNotification);
274     }
275
276     @Override
277     public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
278         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
279         final SettableFuture<TransactionId> result = SettableFuture.create();
280         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
281
282             @Override
283             public Void call() throws Exception {
284                 final GetAllGroupStatisticsInputBuilder builder =
285                         new GetAllGroupStatisticsInputBuilder();
286                 builder.setNode(nodeRef);
287                 registrationRpcFutureCallBack(groupStatsService
288                         .getAllGroupStatistics(builder.build()), null, nodeRef, result);
289                 return null;
290             }
291         };
292         addGetAllStatJob(getAllGroupStat);
293         return result;
294     }
295
296     @Override
297     public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
298         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
299         final SettableFuture<TransactionId> result = SettableFuture.create();
300         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
301
302             @Override
303             public Void call() throws Exception {
304                 final GetAllMeterStatisticsInputBuilder builder =
305                         new GetAllMeterStatisticsInputBuilder();
306                 builder.setNode(nodeRef);
307                 registrationRpcFutureCallBack(meterStatsService
308                         .getAllMeterStatistics(builder.build()), null, nodeRef, result);
309                 return null;
310             }
311         };
312         addGetAllStatJob(getAllMeterStat);
313         return result;
314     }
315
316     @Override
317     public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
318         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
319         final SettableFuture<TransactionId> result = SettableFuture.create();
320         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
321
322             @Override
323             public Void call() throws Exception {
324                 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
325                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
326                 builder.setNode(nodeRef);
327                 registrationRpcFutureCallBack(flowStatsService
328                         .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
329                 return null;
330             }
331         };
332         addGetAllStatJob(getAllFlowStat);
333         return result;
334     }
335
336     @Override
337     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
338         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
339         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
340         final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
341
342             @Override
343             public Void call() throws Exception {
344                 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
345                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
346                 builder.setNode(nodeRef);
347                 builder.setTableId(tableId);
348
349                 final TableBuilder tbuilder = new TableBuilder();
350                 tbuilder.setId(tableId.getValue());
351                 tbuilder.setKey(new TableKey(tableId.getValue()));
352                 registrationRpcFutureCallBack(flowStatsService
353                         .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
354                 return null;
355             }
356         };
357         addGetAllStatJob(getAggregateFlowStat);
358     }
359
360     @Override
361     public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
362         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
363         final SettableFuture<TransactionId> result = SettableFuture.create();
364         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
365
366             @Override
367             public Void call() throws Exception {
368                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
369                         new GetAllNodeConnectorsStatisticsInputBuilder();
370                 builder.setNode(nodeRef);
371                 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
372                         portStatsService.getAllNodeConnectorsStatistics(builder.build());
373                 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
374                 return null;
375             }
376         };
377         addGetAllStatJob(getAllPortsStat);
378         return result;
379     }
380
381     @Override
382     public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
383         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
384         final SettableFuture<TransactionId> result = SettableFuture.create();
385         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
386
387             @Override
388             public Void call() throws Exception {
389                 final GetFlowTablesStatisticsInputBuilder builder =
390                         new GetFlowTablesStatisticsInputBuilder();
391                 builder.setNode(nodeRef);
392                 registrationRpcFutureCallBack(flowTableStatsService
393                         .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
394                 return null;
395             }
396         };
397         addGetAllStatJob(getAllTableStat);
398         return result;
399     }
400
401     @Override
402     public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
403         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
404         final SettableFuture<TransactionId> result = SettableFuture.create();
405         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
406
407             @Override
408             public Void call() throws Exception {
409                 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
410                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
411                 builder.setNode(nodeRef);
412                 registrationRpcFutureCallBack(queueStatsService
413                         .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
414                 return null;
415             }
416         };
417         addGetAllStatJob(getAllQueueStat);
418         return result;
419     }
420
421     @Override
422     public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
423         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
424         final SettableFuture<TransactionId> result = SettableFuture.create();
425         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
426
427             @Override
428             public Void call() throws Exception {
429                 final GetAllMeterConfigStatisticsInputBuilder builder =
430                         new GetAllMeterConfigStatisticsInputBuilder();
431                 builder.setNode(nodeRef);
432                 registrationRpcFutureCallBack(meterStatsService
433                         .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
434                 return null;
435             }
436         };
437         addGetAllStatJob(qetAllMeterConfStat);
438         return result;
439     }
440
441     @Override
442     public void getGroupFeaturesStat(final NodeRef nodeRef) {
443         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
444         final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
445
446             @Override
447             public Void call() throws Exception {
448                 /* RPC input */
449                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
450                 input.setNode(nodeRef);
451                 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
452                 return null;
453             }
454         };
455         addStatJob(getGroupFeaturesStat);
456     }
457
458     @Override
459     public void getMeterFeaturesStat(final NodeRef nodeRef) {
460         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
461         final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
462
463             @Override
464             public Void call() throws Exception {
465                 /* RPC input */
466                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
467                 input.setNode(nodeRef);
468                 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
469                 return null;
470             }
471         };
472         addStatJob(getMeterFeaturesStat);
473     }
474
475     @Override
476     public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
477         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
478         final SettableFuture<TransactionId> result = SettableFuture.create();
479         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
480
481             @Override
482             public Void call() throws Exception {
483                 final GetGroupDescriptionInputBuilder builder =
484                         new GetGroupDescriptionInputBuilder();
485                 builder.setNode(nodeRef);
486                 registrationRpcFutureCallBack(groupStatsService
487                         .getGroupDescription(builder.build()), null, nodeRef, result);
488
489                 return null;
490             }
491         };
492         addGetAllStatJob(getAllGropConfStat);
493         return result;
494     }
495
496     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
497
498         private final TransactionId id;
499         private final NodeId nId;
500         private final List<T> notifications;
501         private final Optional<? extends DataObject> confInput;
502
503         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
504             this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
505             notifications = new CopyOnWriteArrayList<T>();
506             confInput = Optional.fromNullable(input);
507             nId = nodeId;
508         }
509
510         @Override
511         public void addNotif(final T notif) {
512             notifications.add(notif);
513         }
514
515         @Override
516         public TransactionId getId() {
517             return id;
518         }
519
520         @Override
521         public NodeId getNodeId() {
522             return nId;
523         }
524
525         @Override
526         public List<T> getNotifications() {
527             return notifications;
528         }
529
530         @Override
531         public Optional<? extends DataObject> getConfInput() {
532             return confInput;
533         }
534     }
535 }
536