Merge "Bug 1948: Separate out restconf features."
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.List;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.CopyOnWriteArrayList;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
17
18 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
19 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
20 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
21 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
22 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Optional;
55 import com.google.common.base.Preconditions;
56 import com.google.common.cache.Cache;
57 import com.google.common.cache.CacheBuilder;
58 import com.google.common.util.concurrent.FutureCallback;
59 import com.google.common.util.concurrent.Futures;
60 import com.google.common.util.concurrent.JdkFutureAdapters;
61 import com.google.common.util.concurrent.SettableFuture;
62
63
64 /**
65  * statistics-manager
66  * org.opendaylight.controller.md.statistics.manager.impl
67  *
68  * StatRpcMsgManagerImpl
69  * Class register and provide all RPC Statistics Device Services and implement pre-defined
70  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
71  *
72  * In next Class implement process for joining multipart messages.
73  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
74  * One Weak map is used for holding all Multipart Messages and second is used for possible input
75  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
76  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
77  *
78  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
79  *
80  */
81 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
82
83     private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
84
85     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
86
87     private final long maxLifeForRequest = 50; /* 50 second */
88     private final int queueCapacity = 5000;
89     private final StatisticsManager manager;
90
91     private final OpendaylightGroupStatisticsService groupStatsService;
92     private final OpendaylightMeterStatisticsService meterStatsService;
93     private final OpendaylightFlowStatisticsService flowStatsService;
94     private final OpendaylightPortStatisticsService portStatsService;
95     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
96     private final OpendaylightQueueStatisticsService queueStatsService;
97
98     private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
99
100     private volatile boolean finishing = false;
101
102     public StatRpcMsgManagerImpl (final StatisticsManager manager,
103             final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
104         this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
105         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
106         groupStatsService = Preconditions.checkNotNull(
107                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
108                 "OpendaylightGroupStatisticsService can not be null!");
109         meterStatsService = Preconditions.checkNotNull(
110                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
111                 "OpendaylightMeterStatisticsService can not be null!");
112         flowStatsService = Preconditions.checkNotNull(
113                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
114                 "OpendaylightFlowStatisticsService can not be null!");
115         portStatsService = Preconditions.checkNotNull(
116                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
117                 "OpendaylightPortStatisticsService can not be null!");
118         flowTableStatsService = Preconditions.checkNotNull(
119                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
120                 "OpendaylightFlowTableStatisticsService can not be null!");
121         queueStatsService = Preconditions.checkNotNull(
122                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
123                 "OpendaylightQueueStatisticsService can not be null!");
124
125         statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
126         txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
127                 .maximumSize(10000).build();
128     }
129
130     @Override
131     public void close() {
132         finishing = true;
133         statsRpcJobQueue = null;
134     }
135
136     @Override
137     public void run() {
138          /* Neverending cyle - wait for finishing */
139         while ( ! finishing) {
140             try {
141                 statsRpcJobQueue.take().call();
142             }
143             catch (final Exception e) {
144                 LOG.warn("Stat Element RPC executor fail!", e);
145             }
146         }
147         // Drain all rpcCall, making sure any blocked threads are unblocked
148         while ( ! statsRpcJobQueue.isEmpty()) {
149             statsRpcJobQueue.poll();
150         }
151     }
152
153     private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
154         final boolean success = statsRpcJobQueue.offer(getAllStatJob);
155         if ( ! success) {
156             LOG.warn("Put RPC request getAllStat fail! Queue is full.");
157         }
158     }
159
160     private void addStatJob(final RpcJobsQueue getStatJob) {
161         final boolean success = statsRpcJobQueue.offer(getStatJob);
162         if ( ! success) {
163             LOG.debug("Put RPC request for getStat fail! Queue is full.");
164         }
165     }
166
167     @Override
168     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
169             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
170
171         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
172                 new FutureCallback<RpcResult<? extends TransactionAware>>() {
173
174             @Override
175             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
176                 final TransactionId id = result.getResult().getTransactionId();
177                 if (id == null) {
178                     LOG.warn("No protocol support");
179                 } else {
180                     final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
181                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
182                     final TransactionCacheContainer<? super TransactionAware> container =
183                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
184                     txCache.put(cacheKey, container);
185                 }
186             }
187
188             @Override
189             public void onFailure(final Throwable t) {
190                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
191             }
192
193         });
194     }
195
196     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
197         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
198     }
199
200     @Override
201     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
202             final TransactionId id, final NodeId nodeId) {
203         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
204         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
205
206         final String key = buildCacheKey(id, nodeId);
207         final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
208
209         final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
210
211             @Override
212             public Void call() throws Exception {
213                 final Optional<TransactionCacheContainer<?>> resultContainer =
214                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
215                 if (resultContainer.isPresent()) {
216                     txCache.invalidate(key);
217                 }
218                 result.set(resultContainer);
219                 return null;
220             }
221         };
222         addStatJob(getTransactionCacheContainer);
223         return result;
224     }
225
226     @Override
227     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
228         Preconditions.checkArgument(id != null, "TransactionId can not be null!");
229         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
230
231         final String key = buildCacheKey(id, nodeId);
232         final SettableFuture<Boolean> checkStatId = SettableFuture.create();
233
234         final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
235
236             @Override
237             public Void call() throws Exception {
238                 final Optional<TransactionCacheContainer<?>> result =
239                         Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
240                 checkStatId.set(Boolean.valueOf(result.isPresent()));
241                 return null;
242             }
243         };
244         addStatJob(isExpecedStatistics);
245         return checkStatId;
246     }
247
248     @Override
249     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
250         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
251         Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
252
253         final RpcJobsQueue addNotification = new RpcJobsQueue() {
254
255             @Override
256             public Void call() throws Exception {
257                 final TransactionId txId = notification.getTransactionId();
258                 final String key = buildCacheKey(txId, nodeId);
259                 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
260                 if (container != null) {
261                     container.addNotif(notification);
262                 }
263                 return null;
264             }
265         };
266         addStatJob(addNotification);
267     }
268
269     @Override
270     public void getAllGroupsStat(final NodeRef nodeRef) {
271         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
272         final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
273
274             @Override
275             public Void call() throws Exception {
276                 final GetAllGroupStatisticsInputBuilder builder =
277                         new GetAllGroupStatisticsInputBuilder();
278                 builder.setNode(nodeRef);
279                 registrationRpcFutureCallBack(groupStatsService
280                         .getAllGroupStatistics(builder.build()), null, nodeRef);
281                 return null;
282             }
283         };
284         addGetAllStatJob(getAllGroupStat);
285     }
286
287     @Override
288     public void getAllMetersStat(final NodeRef nodeRef) {
289         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
290         final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
291
292             @Override
293             public Void call() throws Exception {
294                 final GetAllMeterStatisticsInputBuilder builder =
295                         new GetAllMeterStatisticsInputBuilder();
296                 builder.setNode(nodeRef);
297                 registrationRpcFutureCallBack(meterStatsService
298                         .getAllMeterStatistics(builder.build()), null, nodeRef);
299                 return null;
300             }
301         };
302         addGetAllStatJob(getAllMeterStat);
303     }
304
305     @Override
306     public void getAllFlowsStat(final NodeRef nodeRef) {
307         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
308         final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
309
310             @Override
311             public Void call() throws Exception {
312                 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
313                         new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
314                 builder.setNode(nodeRef);
315                 registrationRpcFutureCallBack(flowStatsService
316                         .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
317                 return null;
318             }
319         };
320         addGetAllStatJob(getAllFlowStat);
321     }
322
323     @Override
324     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
325
326         manager.enqueue(new StatDataStoreOperation() {
327
328             @Override
329             public void applyOperation(final ReadWriteTransaction tx) {
330                 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
331                     @Override
332                     public Void call() throws Exception {
333                         final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
334                                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
335                         builder.setNode(nodeRef);
336                         builder.setTableId(tableId);
337
338                         final TableBuilder tbuilder = new TableBuilder();
339                         tbuilder.setId(tableId.getValue());
340                         tbuilder.setKey(new TableKey(tableId.getValue()));
341                         registrationRpcFutureCallBack(flowStatsService
342                                 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
343                         return null;
344                     }
345                 };
346                 addGetAllStatJob(getAggregateFlowStat);
347             }
348         });
349     }
350
351     @Override
352     public void getAllPortsStat(final NodeRef nodeRef) {
353         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
354         final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
355
356             @Override
357             public Void call() throws Exception {
358                 final GetAllNodeConnectorsStatisticsInputBuilder builder =
359                         new GetAllNodeConnectorsStatisticsInputBuilder();
360                 builder.setNode(nodeRef);
361                 registrationRpcFutureCallBack(portStatsService
362                         .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
363                 return null;
364             }
365         };
366         addGetAllStatJob(getAllPortsStat);
367     }
368
369     @Override
370     public void getAllTablesStat(final NodeRef nodeRef) {
371         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
372         final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
373
374             @Override
375             public Void call() throws Exception {
376                 final GetFlowTablesStatisticsInputBuilder builder =
377                         new GetFlowTablesStatisticsInputBuilder();
378                 builder.setNode(nodeRef);
379                 registrationRpcFutureCallBack(flowTableStatsService
380                         .getFlowTablesStatistics(builder.build()), null, nodeRef);
381                 return null;
382             }
383         };
384         addGetAllStatJob(getAllTableStat);
385     }
386
387     @Override
388     public void getAllQueueStat(final NodeRef nodeRef) {
389         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
390         final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
391
392             @Override
393             public Void call() throws Exception {
394                 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
395                         new GetAllQueuesStatisticsFromAllPortsInputBuilder();
396                 builder.setNode(nodeRef);
397                 registrationRpcFutureCallBack(queueStatsService
398                         .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
399                 return null;
400             }
401         };
402         addGetAllStatJob(getAllQueueStat);
403     }
404
405     @Override
406     public void getAllMeterConfigStat(final NodeRef nodeRef) {
407         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
408         final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
409
410             @Override
411             public Void call() throws Exception {
412                 final GetAllMeterConfigStatisticsInputBuilder builder =
413                         new GetAllMeterConfigStatisticsInputBuilder();
414                 builder.setNode(nodeRef);
415                 registrationRpcFutureCallBack(meterStatsService
416                         .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
417                 return null;
418             }
419         };
420         addGetAllStatJob(qetAllMeterConfStat);
421     }
422
423     @Override
424     public void getGroupFeaturesStat(final NodeRef nodeRef) {
425         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
426         final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
427
428             @Override
429             public Void call() throws Exception {
430                 /* RPC input */
431                 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
432                 input.setNode(nodeRef);
433                 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
434                 return null;
435             }
436         };
437         addStatJob(getGroupFeaturesStat);
438     }
439
440     @Override
441     public void getMeterFeaturesStat(final NodeRef nodeRef) {
442         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
443         final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
444
445             @Override
446             public Void call() throws Exception {
447                 /* RPC input */
448                 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
449                 input.setNode(nodeRef);
450                 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
451                 return null;
452             }
453         };
454         addStatJob(getMeterFeaturesStat);
455     }
456
457     @Override
458     public void getAllGroupsConfStats(final NodeRef nodeRef) {
459         Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
460         final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
461
462             @Override
463             public Void call() throws Exception {
464                 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
465                 final GetGroupDescriptionInputBuilder builder =
466                         new GetGroupDescriptionInputBuilder();
467                 builder.setNode(nodeRef);
468                 registrationRpcFutureCallBack(groupStatsService
469                         .getGroupDescription(builder.build()), null, nodeRef);
470
471                 return null;
472             }
473         };
474         addGetAllStatJob(getAllGropConfStat);
475     }
476
477     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
478
479         private final TransactionId id;
480         private final NodeId nId;
481         private final List<T> notifications;
482         private final Optional<? extends DataObject> confInput;
483
484         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
485             this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
486             notifications = new CopyOnWriteArrayList<T>();
487             confInput = Optional.fromNullable(input);
488             nId = nodeId;
489         }
490
491         @Override
492         public void addNotif(final T notif) {
493             notifications.add(notif);
494         }
495
496         @Override
497         public TransactionId getId() {
498             return id;
499         }
500
501         @Override
502         public NodeId getNodeId() {
503             return nId;
504         }
505
506         @Override
507         public List<T> getNotifications() {
508             return notifications;
509         }
510
511         @Override
512         public Optional<? extends DataObject> getConfInput() {
513             return confInput;
514         }
515     }
516 }
517