Merge "BUG 720 - YANG leaf as JSON input *<*:* couldn't be saved"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.List;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.CopyOnWriteArrayList;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
17
18 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
19 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
47 import org.opendaylight.yangtools.yang.binding.DataObject;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.base.Optional;
53 import com.google.common.base.Preconditions;
54 import com.google.common.cache.Cache;
55 import com.google.common.cache.CacheBuilder;
56 import com.google.common.util.concurrent.FutureCallback;
57 import com.google.common.util.concurrent.Futures;
58 import com.google.common.util.concurrent.JdkFutureAdapters;
59 import com.google.common.util.concurrent.SettableFuture;
60
61
62 /**
63  * statistics-manager
64  * org.opendaylight.controller.md.statistics.manager.impl
65  *
66  * StatRpcMsgManagerImpl
67  * Class register and provide all RPC Statistics Device Services and implement pre-defined
68  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
69  *
70  * In next Class implement process for joining multipart messages.
71  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
72  * One Weak map is used for holding all Multipart Messages and second is used for possible input
73  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
74  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
75  *
76  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
77  *
78  */
79 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
80
81     private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
82
83     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
84
85     private final long maxLifeForRequest = 50; /* 50 second */
86     private final int queueCapacity = 5000;
87
88     private final OpendaylightGroupStatisticsService groupStatsService;
89     private final OpendaylightMeterStatisticsService meterStatsService;
90     private final OpendaylightFlowStatisticsService flowStatsService;
91     private final OpendaylightPortStatisticsService portStatsService;
92     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
93     private final OpendaylightQueueStatisticsService queueStatsService;
94
95     private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
96
97     private volatile boolean finishing = false;
98
99     public StatRpcMsgManagerImpl (final StatisticsManager manager,
100             final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
101         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
102         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
103         groupStatsService = Preconditions.checkNotNull(
104                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
105                 "OpendaylightGroupStatisticsService can not be null!");
106         meterStatsService = Preconditions.checkNotNull(
107                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
108                 "OpendaylightMeterStatisticsService can not be null!");
109         flowStatsService = Preconditions.checkNotNull(
110                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
111                 "OpendaylightFlowStatisticsService can not be null!");
112         portStatsService = Preconditions.checkNotNull(
113                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
114                 "OpendaylightPortStatisticsService can not be null!");
115         flowTableStatsService = Preconditions.checkNotNull(
116                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
117                 "OpendaylightFlowTableStatisticsService can not be null!");
118         queueStatsService = Preconditions.checkNotNull(
119                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
120                 "OpendaylightQueueStatisticsService can not be null!");
121
122         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
123         txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
124                 .maximumSize(10000).build();
125     }
126
127     @Override
128     public void close() {
129         finishing = true;
130         statsRpcJobQueue = null;
131     }
132
133     @Override
134     public void run() {
135          /* Neverending cyle - wait for finishing */
136         while ( ! finishing) {
137             try {
138                 statsRpcJobQueue.take().call();
139             }
140             catch (final Exception e) {
141                 LOG.warn("Stat Element RPC executor fail!", e);
142             }
143         }
144         // Drain all rpcCall, making sure any blocked threads are unblocked
145         while ( ! statsRpcJobQueue.isEmpty()) {
146             statsRpcJobQueue.poll();
147         }
148     }
149
150     private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
151         final boolean success = statsRpcJobQueue.offer(getAllStatJob);
152         if ( ! success) {
153             LOG.warn("Put RPC request getAllStat fail! Queue is full.");
154         }
155     }
156
157     private void addStatJob(final RpcJobsQueue getStatJob) {
158         final boolean success = statsRpcJobQueue.offer(getStatJob);
159         if ( ! success) {
160             LOG.debug("Put RPC request for getStat fail! Queue is full.");
161         }
162     }
163
164     @Override
165     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
166             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
167
168         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
169                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
170
171             @Override
172             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
173                 final TransactionId id = result.getResult().getTransactionId();
174                 if (id == null) {
175                     LOG.warn("No protocol support");
176                 } else {
177                     final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
178                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
179                     final TransactionCacheContainer<? super TransactionAware> container =
180                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
181                     txCache.put(cacheKey, container);
182                 }
183             }
184
185             @Override
186             public void onFailure(final Throwable t) {
187                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
188             }
189
190         });
191     }
192
193     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
194         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
195     }
196
197     @Override
198     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
199             final TransactionId id, final NodeId nodeId) {
200         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
201         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
202
203         final String key = buildCacheKey(id, nodeId);
204         final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
205
206         final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
207
208             @Override
209             public Void call() throws Exception {
210                 final Optional<TransactionCacheContainer<?>> resultContainer =
211                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
212                 if (resultContainer.isPresent()) {
213                     txCache.invalidate(key);
214                 }
215                 result.set(resultContainer);
216                 return null;
217             }
218         };
219         addStatJob(getTransactionCacheContainer);
220         return result;
221     }
222
223     @Override
224     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
225         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
226         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
227
228         final String key = buildCacheKey(id, nodeId);
229         final SettableFuture<Boolean> checkStatId = SettableFuture.create();
230
231         final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
232
233             @Override
234             public Void call() throws Exception {
235                 final Optional<TransactionCacheContainer<?>> result =
236                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
237                 checkStatId.set(Boolean.valueOf(result.isPresent()));
238                 return null;
239             }
240         };
241         addStatJob(isExpecedStatistics);
242         return checkStatId;
243     }
244
245     @Override
246     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
247         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
248         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
249
250         final RpcJobsQueue addNotification = new RpcJobsQueue() {
251
252             @Override
253             public Void call() throws Exception {
254                 final TransactionId txId = notification.getTransactionId();
255                 final String key = buildCacheKey(txId, nodeId);
256                 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
257                 if (container != null) {
258                     container.addNotif(notification);
259                 }
260                 return null;
261             }
262         };
263         addStatJob(addNotification);
264     }
265
266     @Override
267     public void getAllGroupsStat(final NodeRef nodeRef) {
268         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
269         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
270
271             @Override
272             public Void call() throws Exception {
273                 final GetAllGroupStatisticsInputBuilder builder =
274                         new GetAllGroupStatisticsInputBuilder();
275                 builder.setNode(nodeRef);
276                 registrationRpcFutureCallBack(groupStatsService
277                         .getAllGroupStatistics(builder.build()), null, nodeRef);
278                 return null;
279             }
280         };
281         addGetAllStatJob(getAllGroupStat);
282     }
283
284     @Override
285     public void getAllMetersStat(final NodeRef nodeRef) {
286         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
287         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
288
289             @Override
290             public Void call() throws Exception {
291                 final GetAllMeterStatisticsInputBuilder builder =
292                         new GetAllMeterStatisticsInputBuilder();
293                 builder.setNode(nodeRef);
294                 registrationRpcFutureCallBack(meterStatsService
295                         .getAllMeterStatistics(builder.build()), null, nodeRef);
296                 return null;
297             }
298         };
299         addGetAllStatJob(getAllMeterStat);
300     }
301
302     @Override
303     public void getAllFlowsStat(final NodeRef nodeRef) {
304         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
305         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
306
307             @Override
308             public Void call() throws Exception {
309                 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
310                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
311                 builder.setNode(nodeRef);
312                 registrationRpcFutureCallBack(flowStatsService
313                         .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
314                 return null;
315             }
316         };
317         addGetAllStatJob(getAllFlowStat);
318     }
319
320     @Override
321     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
322         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
323         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
324         final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
325
326             @Override
327             public Void call() throws Exception {
328                 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
329                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
330                 builder.setNode(nodeRef);
331                 builder.setTableId(tableId);
332
333                 final TableBuilder tbuilder = new TableBuilder();
334                 tbuilder.setId(tableId.getValue());
335                 tbuilder.setKey(new TableKey(tableId.getValue()));
336                 registrationRpcFutureCallBack(flowStatsService
337                         .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
338                 return null;
339             }
340         };
341         addGetAllStatJob(getAggregateFlowStat);
342     }
343
344     @Override
345     public void getAllPortsStat(final NodeRef nodeRef) {
346         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
347         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
348
349             @Override
350             public Void call() throws Exception {
351                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
352                         new GetAllNodeConnectorsStatisticsInputBuilder();
353                 builder.setNode(nodeRef);
354                 registrationRpcFutureCallBack(portStatsService
355                         .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
356                 return null;
357             }
358         };
359         addGetAllStatJob(getAllPortsStat);
360     }
361
362     @Override
363     public void getAllTablesStat(final NodeRef nodeRef) {
364         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
365         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
366
367             @Override
368             public Void call() throws Exception {
369                 final GetFlowTablesStatisticsInputBuilder builder =
370                         new GetFlowTablesStatisticsInputBuilder();
371                 builder.setNode(nodeRef);
372                 registrationRpcFutureCallBack(flowTableStatsService
373                         .getFlowTablesStatistics(builder.build()), null, nodeRef);
374                 return null;
375             }
376         };
377         addGetAllStatJob(getAllTableStat);
378     }
379
380     @Override
381     public void getAllQueueStat(final NodeRef nodeRef) {
382         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
383         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
384
385             @Override
386             public Void call() throws Exception {
387                 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
388                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
389                 builder.setNode(nodeRef);
390                 registrationRpcFutureCallBack(queueStatsService
391                         .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
392                 return null;
393             }
394         };
395         addGetAllStatJob(getAllQueueStat);
396     }
397
398     @Override
399     public void getAllMeterConfigStat(final NodeRef nodeRef) {
400         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
401         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
402
403             @Override
404             public Void call() throws Exception {
405                 final GetAllMeterConfigStatisticsInputBuilder builder =
406                         new GetAllMeterConfigStatisticsInputBuilder();
407                 builder.setNode(nodeRef);
408                 registrationRpcFutureCallBack(meterStatsService
409                         .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
410                 return null;
411             }
412         };
413         addGetAllStatJob(qetAllMeterConfStat);
414     }
415
416     @Override
417     public void getGroupFeaturesStat(final NodeRef nodeRef) {
418         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
419         final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
420
421             @Override
422             public Void call() throws Exception {
423                 /* RPC input */
424                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
425                 input.setNode(nodeRef);
426                 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
427                 return null;
428             }
429         };
430         addStatJob(getGroupFeaturesStat);
431     }
432
433     @Override
434     public void getMeterFeaturesStat(final NodeRef nodeRef) {
435         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
436         final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
437
438             @Override
439             public Void call() throws Exception {
440                 /* RPC input */
441                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
442                 input.setNode(nodeRef);
443                 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
444                 return null;
445             }
446         };
447         addStatJob(getMeterFeaturesStat);
448     }
449
450     @Override
451     public void getAllGroupsConfStats(final NodeRef nodeRef) {
452         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
453         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
454
455             @Override
456             public Void call() throws Exception {
457                 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
458                 final GetGroupDescriptionInputBuilder builder =
459                         new GetGroupDescriptionInputBuilder();
460                 builder.setNode(nodeRef);
461                 registrationRpcFutureCallBack(groupStatsService
462                         .getGroupDescription(builder.build()), null, nodeRef);
463
464                 return null;
465             }
466         };
467         addGetAllStatJob(getAllGropConfStat);
468     }
469
470     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
471
472         private final TransactionId id;
473         private final NodeId nId;
474         private final List<T> notifications;
475         private final Optional<? extends DataObject> confInput;
476
477         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
478             this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
479             notifications = new CopyOnWriteArrayList<T>();
480             confInput = Optional.fromNullable(input);
481             nId = nodeId;
482         }
483
484         @Override
485         public void addNotif(final T notif) {
486             notifications.add(notif);
487         }
488
489         @Override
490         public TransactionId getId() {
491             return id;
492         }
493
494         @Override
495         public NodeId getNodeId() {
496             return nId;
497         }
498
499         @Override
500         public List<T> getNotifications() {
501             return notifications;
502         }
503
504         @Override
505         public Optional<? extends DataObject> getConfInput() {
506             return confInput;
507         }
508     }
509 }
510