2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.List;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.CopyOnWriteArrayList;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
19 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 import com.google.common.base.Optional;
54 import com.google.common.base.Preconditions;
55 import com.google.common.cache.Cache;
56 import com.google.common.cache.CacheBuilder;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
59 import com.google.common.util.concurrent.JdkFutureAdapters;
60 import com.google.common.util.concurrent.SettableFuture;
65 * org.opendaylight.controller.md.statistics.manager.impl
67 * StatRpcMsgManagerImpl
68 * Class register and provide all RPC Statistics Device Services and implement pre-defined
69 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
71 * In next Class implement process for joining multipart messages.
72 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
73 * One Weak map is used for holding all Multipart Messages and second is used for possible input
74 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
75 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
77 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
80 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
82 private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
84 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
86 private final int queueCapacity = 5000;
88 private final OpendaylightGroupStatisticsService groupStatsService;
89 private final OpendaylightMeterStatisticsService meterStatsService;
90 private final OpendaylightFlowStatisticsService flowStatsService;
91 private final OpendaylightPortStatisticsService portStatsService;
92 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
93 private final OpendaylightQueueStatisticsService queueStatsService;
95 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
97 private volatile boolean finishing = false;
99 public StatRpcMsgManagerImpl (final StatisticsManager manager,
100 final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
101 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
102 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
103 groupStatsService = Preconditions.checkNotNull(
104 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
105 "OpendaylightGroupStatisticsService can not be null!");
106 meterStatsService = Preconditions.checkNotNull(
107 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
108 "OpendaylightMeterStatisticsService can not be null!");
109 flowStatsService = Preconditions.checkNotNull(
110 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
111 "OpendaylightFlowStatisticsService can not be null!");
112 portStatsService = Preconditions.checkNotNull(
113 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
114 "OpendaylightPortStatisticsService can not be null!");
115 flowTableStatsService = Preconditions.checkNotNull(
116 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
117 "OpendaylightFlowTableStatisticsService can not be null!");
118 queueStatsService = Preconditions.checkNotNull(
119 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
120 "OpendaylightQueueStatisticsService can not be null!");
122 statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
123 /* nr. 7 is here nr. of possible statistic which are waiting for notification
124 * - check it in StatPermCollectorImpl method collectStatCrossNetwork */
125 txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
126 .maximumSize(10000).build();
130 public void close() {
132 statsRpcJobQueue = null;
137 /* Neverending cyle - wait for finishing */
138 while ( ! finishing) {
140 statsRpcJobQueue.take().call();
142 catch (final Exception e) {
143 LOG.warn("Stat Element RPC executor fail!", e);
146 // Drain all rpcCall, making sure any blocked threads are unblocked
147 while ( ! statsRpcJobQueue.isEmpty()) {
148 statsRpcJobQueue.poll();
152 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
153 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
155 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
159 private void addStatJob(final RpcJobsQueue getStatJob) {
160 final boolean success = statsRpcJobQueue.offer(getStatJob);
162 LOG.debug("Put RPC request for getStat fail! Queue is full.");
167 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
168 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
169 final SettableFuture<TransactionId> resultTransId) {
171 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
172 new FutureCallback<RpcResult<? extends TransactionAware>>() {
175 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
176 final TransactionId id = result.getResult().getTransactionId();
178 LOG.warn("No protocol support");
180 if (resultTransId != null) {
181 resultTransId.set(id);
183 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
184 final String cacheKey = buildCacheKey(id, nodeKey.getId());
185 final TransactionCacheContainer<? super TransactionAware> container =
186 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
187 txCache.put(cacheKey, container);
192 public void onFailure(final Throwable t) {
193 LOG.warn("Response Registration for Statistics RPC call fail!", t);
199 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
200 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
204 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
205 final TransactionId id, final NodeId nodeId) {
206 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
207 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
209 final String key = buildCacheKey(id, nodeId);
210 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
212 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
215 public Void call() throws Exception {
216 final Optional<TransactionCacheContainer<?>> resultContainer =
217 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
218 if (resultContainer.isPresent()) {
219 txCache.invalidate(key);
221 result.set(resultContainer);
225 addStatJob(getTransactionCacheContainer);
230 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
231 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
232 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
234 final String key = buildCacheKey(id, nodeId);
235 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
237 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
240 public Void call() throws Exception {
241 final Optional<TransactionCacheContainer<?>> result =
242 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
243 checkStatId.set(Boolean.valueOf(result.isPresent()));
247 addStatJob(isExpecedStatistics);
252 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
253 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
254 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
256 final RpcJobsQueue addNotification = new RpcJobsQueue() {
259 public Void call() throws Exception {
260 final TransactionId txId = notification.getTransactionId();
261 final String key = buildCacheKey(txId, nodeId);
262 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
263 if (container != null) {
264 container.addNotif(notification);
269 addStatJob(addNotification);
273 public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
274 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
275 final SettableFuture<TransactionId> result = SettableFuture.create();
276 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
279 public Void call() throws Exception {
280 final GetAllGroupStatisticsInputBuilder builder =
281 new GetAllGroupStatisticsInputBuilder();
282 builder.setNode(nodeRef);
283 registrationRpcFutureCallBack(groupStatsService
284 .getAllGroupStatistics(builder.build()), null, nodeRef, result);
288 addGetAllStatJob(getAllGroupStat);
293 public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
294 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
295 final SettableFuture<TransactionId> result = SettableFuture.create();
296 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
299 public Void call() throws Exception {
300 final GetAllMeterStatisticsInputBuilder builder =
301 new GetAllMeterStatisticsInputBuilder();
302 builder.setNode(nodeRef);
303 registrationRpcFutureCallBack(meterStatsService
304 .getAllMeterStatistics(builder.build()), null, nodeRef, result);
308 addGetAllStatJob(getAllMeterStat);
313 public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
314 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
315 final SettableFuture<TransactionId> result = SettableFuture.create();
316 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
319 public Void call() throws Exception {
320 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
321 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
322 builder.setNode(nodeRef);
323 registrationRpcFutureCallBack(flowStatsService
324 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
328 addGetAllStatJob(getAllFlowStat);
333 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
334 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
335 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
336 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
339 public Void call() throws Exception {
340 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
341 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
342 builder.setNode(nodeRef);
343 builder.setTableId(tableId);
345 final TableBuilder tbuilder = new TableBuilder();
346 tbuilder.setId(tableId.getValue());
347 tbuilder.setKey(new TableKey(tableId.getValue()));
348 registrationRpcFutureCallBack(flowStatsService
349 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
353 addGetAllStatJob(getAggregateFlowStat);
357 public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
358 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
359 final SettableFuture<TransactionId> result = SettableFuture.create();
360 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
363 public Void call() throws Exception {
364 final GetAllNodeConnectorsStatisticsInputBuilder builder =
365 new GetAllNodeConnectorsStatisticsInputBuilder();
366 builder.setNode(nodeRef);
367 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
368 portStatsService.getAllNodeConnectorsStatistics(builder.build());
369 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
373 addGetAllStatJob(getAllPortsStat);
378 public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
379 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
380 final SettableFuture<TransactionId> result = SettableFuture.create();
381 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
384 public Void call() throws Exception {
385 final GetFlowTablesStatisticsInputBuilder builder =
386 new GetFlowTablesStatisticsInputBuilder();
387 builder.setNode(nodeRef);
388 registrationRpcFutureCallBack(flowTableStatsService
389 .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
393 addGetAllStatJob(getAllTableStat);
398 public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
399 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
400 final SettableFuture<TransactionId> result = SettableFuture.create();
401 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
404 public Void call() throws Exception {
405 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
406 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
407 builder.setNode(nodeRef);
408 registrationRpcFutureCallBack(queueStatsService
409 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
413 addGetAllStatJob(getAllQueueStat);
418 public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
419 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
420 final SettableFuture<TransactionId> result = SettableFuture.create();
421 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
424 public Void call() throws Exception {
425 final GetAllMeterConfigStatisticsInputBuilder builder =
426 new GetAllMeterConfigStatisticsInputBuilder();
427 builder.setNode(nodeRef);
428 registrationRpcFutureCallBack(meterStatsService
429 .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
433 addGetAllStatJob(qetAllMeterConfStat);
438 public void getGroupFeaturesStat(final NodeRef nodeRef) {
439 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
440 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
443 public Void call() throws Exception {
445 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
446 input.setNode(nodeRef);
447 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
451 addStatJob(getGroupFeaturesStat);
455 public void getMeterFeaturesStat(final NodeRef nodeRef) {
456 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
457 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
460 public Void call() throws Exception {
462 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
463 input.setNode(nodeRef);
464 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
468 addStatJob(getMeterFeaturesStat);
472 public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
473 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
474 final SettableFuture<TransactionId> result = SettableFuture.create();
475 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
478 public Void call() throws Exception {
479 final GetGroupDescriptionInputBuilder builder =
480 new GetGroupDescriptionInputBuilder();
481 builder.setNode(nodeRef);
482 registrationRpcFutureCallBack(groupStatsService
483 .getGroupDescription(builder.build()), null, nodeRef, result);
488 addGetAllStatJob(getAllGropConfStat);
492 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
494 private final TransactionId id;
495 private final NodeId nId;
496 private final List<T> notifications;
497 private final Optional<? extends DataObject> confInput;
499 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
500 this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
501 notifications = new CopyOnWriteArrayList<T>();
502 confInput = Optional.fromNullable(input);
507 public void addNotif(final T notif) {
508 notifications.add(notif);
512 public TransactionId getId() {
517 public NodeId getNodeId() {
522 public List<T> getNotifications() {
523 return notifications;
527 public Optional<? extends DataObject> getConfInput() {