2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.List;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.CopyOnWriteArrayList;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
19 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
20 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
21 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
22 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.base.Optional;
55 import com.google.common.base.Preconditions;
56 import com.google.common.cache.Cache;
57 import com.google.common.cache.CacheBuilder;
58 import com.google.common.util.concurrent.FutureCallback;
59 import com.google.common.util.concurrent.Futures;
60 import com.google.common.util.concurrent.JdkFutureAdapters;
61 import com.google.common.util.concurrent.SettableFuture;
66 * org.opendaylight.controller.md.statistics.manager.impl
68 * StatRpcMsgManagerImpl
69 * Class register and provide all RPC Statistics Device Services and implement pre-defined
70 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
72 * In next Class implement process for joining multipart messages.
73 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
74 * One Weak map is used for holding all Multipart Messages and second is used for possible input
75 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
76 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
78 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
81 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
83 private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
85 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
87 private final long maxLifeForRequest = 50; /* 50 second */
88 private final int queueCapacity = 5000;
89 private final StatisticsManager manager;
91 private final OpendaylightGroupStatisticsService groupStatsService;
92 private final OpendaylightMeterStatisticsService meterStatsService;
93 private final OpendaylightFlowStatisticsService flowStatsService;
94 private final OpendaylightPortStatisticsService portStatsService;
95 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
96 private final OpendaylightQueueStatisticsService queueStatsService;
98 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
100 private volatile boolean finishing = false;
102 public StatRpcMsgManagerImpl (final StatisticsManager manager,
103 final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
104 this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
105 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
106 groupStatsService = Preconditions.checkNotNull(
107 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
108 "OpendaylightGroupStatisticsService can not be null!");
109 meterStatsService = Preconditions.checkNotNull(
110 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
111 "OpendaylightMeterStatisticsService can not be null!");
112 flowStatsService = Preconditions.checkNotNull(
113 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
114 "OpendaylightFlowStatisticsService can not be null!");
115 portStatsService = Preconditions.checkNotNull(
116 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
117 "OpendaylightPortStatisticsService can not be null!");
118 flowTableStatsService = Preconditions.checkNotNull(
119 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
120 "OpendaylightFlowTableStatisticsService can not be null!");
121 queueStatsService = Preconditions.checkNotNull(
122 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
123 "OpendaylightQueueStatisticsService can not be null!");
125 statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
126 txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
127 .maximumSize(10000).build();
131 public void close() {
133 statsRpcJobQueue = null;
138 /* Neverending cyle - wait for finishing */
139 while ( ! finishing) {
141 statsRpcJobQueue.take().call();
143 catch (final Exception e) {
144 LOG.warn("Stat Element RPC executor fail!", e);
147 // Drain all rpcCall, making sure any blocked threads are unblocked
148 while ( ! statsRpcJobQueue.isEmpty()) {
149 statsRpcJobQueue.poll();
153 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
154 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
156 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
160 private void addStatJob(final RpcJobsQueue getStatJob) {
161 final boolean success = statsRpcJobQueue.offer(getStatJob);
163 LOG.debug("Put RPC request for getStat fail! Queue is full.");
168 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
169 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
171 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
172 new FutureCallback<RpcResult<? extends TransactionAware>>() {
175 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
176 final TransactionId id = result.getResult().getTransactionId();
178 LOG.warn("No protocol support");
180 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
181 final String cacheKey = buildCacheKey(id, nodeKey.getId());
182 final TransactionCacheContainer<? super TransactionAware> container =
183 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
184 txCache.put(cacheKey, container);
189 public void onFailure(final Throwable t) {
190 LOG.warn("Response Registration for Statistics RPC call fail!", t);
196 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
197 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
201 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
202 final TransactionId id, final NodeId nodeId) {
203 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
204 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
206 final String key = buildCacheKey(id, nodeId);
207 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
209 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
212 public Void call() throws Exception {
213 final Optional<TransactionCacheContainer<?>> resultContainer =
214 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
215 if (resultContainer.isPresent()) {
216 txCache.invalidate(key);
218 result.set(resultContainer);
222 addStatJob(getTransactionCacheContainer);
227 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
228 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
229 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
231 final String key = buildCacheKey(id, nodeId);
232 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
234 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
237 public Void call() throws Exception {
238 final Optional<TransactionCacheContainer<?>> result =
239 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
240 checkStatId.set(Boolean.valueOf(result.isPresent()));
244 addStatJob(isExpecedStatistics);
249 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
250 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
251 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
253 final RpcJobsQueue addNotification = new RpcJobsQueue() {
256 public Void call() throws Exception {
257 final TransactionId txId = notification.getTransactionId();
258 final String key = buildCacheKey(txId, nodeId);
259 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
260 if (container != null) {
261 container.addNotif(notification);
266 addStatJob(addNotification);
270 public void getAllGroupsStat(final NodeRef nodeRef) {
271 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
272 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
275 public Void call() throws Exception {
276 final GetAllGroupStatisticsInputBuilder builder =
277 new GetAllGroupStatisticsInputBuilder();
278 builder.setNode(nodeRef);
279 registrationRpcFutureCallBack(groupStatsService
280 .getAllGroupStatistics(builder.build()), null, nodeRef);
284 addGetAllStatJob(getAllGroupStat);
288 public void getAllMetersStat(final NodeRef nodeRef) {
289 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
290 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
293 public Void call() throws Exception {
294 final GetAllMeterStatisticsInputBuilder builder =
295 new GetAllMeterStatisticsInputBuilder();
296 builder.setNode(nodeRef);
297 registrationRpcFutureCallBack(meterStatsService
298 .getAllMeterStatistics(builder.build()), null, nodeRef);
302 addGetAllStatJob(getAllMeterStat);
306 public void getAllFlowsStat(final NodeRef nodeRef) {
307 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
308 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
311 public Void call() throws Exception {
312 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
313 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
314 builder.setNode(nodeRef);
315 registrationRpcFutureCallBack(flowStatsService
316 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
320 addGetAllStatJob(getAllFlowStat);
324 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
326 manager.enqueue(new StatDataStoreOperation() {
329 public void applyOperation(final ReadWriteTransaction tx) {
330 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
332 public Void call() throws Exception {
333 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
334 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
335 builder.setNode(nodeRef);
336 builder.setTableId(tableId);
338 final TableBuilder tbuilder = new TableBuilder();
339 tbuilder.setId(tableId.getValue());
340 tbuilder.setKey(new TableKey(tableId.getValue()));
341 registrationRpcFutureCallBack(flowStatsService
342 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
346 addGetAllStatJob(getAggregateFlowStat);
352 public void getAllPortsStat(final NodeRef nodeRef) {
353 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
354 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
357 public Void call() throws Exception {
358 final GetAllNodeConnectorsStatisticsInputBuilder builder =
359 new GetAllNodeConnectorsStatisticsInputBuilder();
360 builder.setNode(nodeRef);
361 registrationRpcFutureCallBack(portStatsService
362 .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
366 addGetAllStatJob(getAllPortsStat);
370 public void getAllTablesStat(final NodeRef nodeRef) {
371 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
372 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
375 public Void call() throws Exception {
376 final GetFlowTablesStatisticsInputBuilder builder =
377 new GetFlowTablesStatisticsInputBuilder();
378 builder.setNode(nodeRef);
379 registrationRpcFutureCallBack(flowTableStatsService
380 .getFlowTablesStatistics(builder.build()), null, nodeRef);
384 addGetAllStatJob(getAllTableStat);
388 public void getAllQueueStat(final NodeRef nodeRef) {
389 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
390 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
393 public Void call() throws Exception {
394 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
395 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
396 builder.setNode(nodeRef);
397 registrationRpcFutureCallBack(queueStatsService
398 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
402 addGetAllStatJob(getAllQueueStat);
406 public void getAllMeterConfigStat(final NodeRef nodeRef) {
407 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
408 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
411 public Void call() throws Exception {
412 final GetAllMeterConfigStatisticsInputBuilder builder =
413 new GetAllMeterConfigStatisticsInputBuilder();
414 builder.setNode(nodeRef);
415 registrationRpcFutureCallBack(meterStatsService
416 .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
420 addGetAllStatJob(qetAllMeterConfStat);
424 public void getGroupFeaturesStat(final NodeRef nodeRef) {
425 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
426 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
429 public Void call() throws Exception {
431 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
432 input.setNode(nodeRef);
433 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
437 addStatJob(getGroupFeaturesStat);
441 public void getMeterFeaturesStat(final NodeRef nodeRef) {
442 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
443 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
446 public Void call() throws Exception {
448 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
449 input.setNode(nodeRef);
450 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
454 addStatJob(getMeterFeaturesStat);
458 public void getAllGroupsConfStats(final NodeRef nodeRef) {
459 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
460 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
463 public Void call() throws Exception {
464 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
465 final GetGroupDescriptionInputBuilder builder =
466 new GetGroupDescriptionInputBuilder();
467 builder.setNode(nodeRef);
468 registrationRpcFutureCallBack(groupStatsService
469 .getGroupDescription(builder.build()), null, nodeRef);
474 addGetAllStatJob(getAllGropConfStat);
477 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
479 private final TransactionId id;
480 private final NodeId nId;
481 private final List<T> notifications;
482 private final Optional<? extends DataObject> confInput;
484 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
485 this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
486 notifications = new CopyOnWriteArrayList<T>();
487 confInput = Optional.fromNullable(input);
492 public void addNotif(final T notif) {
493 notifications.add(notif);
497 public TransactionId getId() {
502 public NodeId getNodeId() {
507 public List<T> getNotifications() {
508 return notifications;
512 public Optional<? extends DataObject> getConfInput() {