2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
20 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
21 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
67 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
69 * StatRpcMsgManagerImpl
70 * Class register and provide all RPC Statistics Device Services and implement pre-defined
71 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
73 * In next Class implement process for joining multipart messages.
74 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75 * One Weak map is used for holding all Multipart Messages and second is used for possible input
76 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
79 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
85 private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
87 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
89 private static final int MAX_CACHE_SIZE = 10000;
90 private static final int QUEUE_CAPACITY = 5000;
92 private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
93 private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
94 private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
96 * Number of possible statistic which are waiting for notification
97 * - check it in StatPermCollectorImpl method collectStatCrossNetwork()
99 private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
101 private final OpendaylightGroupStatisticsService groupStatsService;
102 private final OpendaylightMeterStatisticsService meterStatsService;
103 private final OpendaylightFlowStatisticsService flowStatsService;
104 private final OpendaylightPortStatisticsService portStatsService;
105 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
106 private final OpendaylightQueueStatisticsService queueStatsService;
108 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
110 private volatile boolean finishing = false;
112 public StatRpcMsgManagerImpl (final StatisticsManager manager,
113 final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
114 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
115 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
116 groupStatsService = Preconditions.checkNotNull(
117 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
118 "OpendaylightGroupStatisticsService can not be null!");
119 meterStatsService = Preconditions.checkNotNull(
120 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
121 "OpendaylightMeterStatisticsService can not be null!");
122 flowStatsService = Preconditions.checkNotNull(
123 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
124 "OpendaylightFlowStatisticsService can not be null!");
125 portStatsService = Preconditions.checkNotNull(
126 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
127 "OpendaylightPortStatisticsService can not be null!");
128 flowTableStatsService = Preconditions.checkNotNull(
129 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
130 "OpendaylightFlowTableStatisticsService can not be null!");
131 queueStatsService = Preconditions.checkNotNull(
132 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
133 "OpendaylightQueueStatisticsService can not be null!");
135 statsRpcJobQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
136 txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
137 .maximumSize(MAX_CACHE_SIZE).build();
141 public void close() {
143 statsRpcJobQueue = null;
148 /* Neverending cyle - wait for finishing */
149 while ( ! finishing) {
151 statsRpcJobQueue.take().call();
153 catch (final Exception e) {
154 LOG.warn("Stat Element RPC executor fail!", e);
157 // Drain all rpcCall, making sure any blocked threads are unblocked
158 while ( ! statsRpcJobQueue.isEmpty()) {
159 statsRpcJobQueue.poll();
163 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
164 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
166 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
170 private void addStatJob(final RpcJobsQueue getStatJob) {
171 final boolean success = statsRpcJobQueue.offer(getStatJob);
173 LOG.debug("Put RPC request for getStat fail! Queue is full.");
178 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
179 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
180 final SettableFuture<TransactionId> resultTransId) {
182 class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
184 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
185 final TransactionId id = result.getResult().getTransactionId();
186 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
188 String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
189 LOG.warn("Node [{}] does not support statistics request type : {}",
190 nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
192 if (resultTransId != null) {
193 resultTransId.set(id);
195 final String cacheKey = buildCacheKey(id, nodeKey.getId());
196 final TransactionCacheContainer<? super TransactionAware> container =
197 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
198 txCache.put(cacheKey, container);
203 public void onFailure(final Throwable t) {
204 LOG.warn("Response Registration for Statistics RPC call fail!", t);
209 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
212 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
213 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
217 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
218 final TransactionId id, final NodeId nodeId) {
219 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
220 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
222 final String key = buildCacheKey(id, nodeId);
223 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
225 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
228 public Void call() throws Exception {
229 final Optional<TransactionCacheContainer<?>> resultContainer =
230 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
231 if (resultContainer.isPresent()) {
232 txCache.invalidate(key);
234 result.set(resultContainer);
238 addStatJob(getTransactionCacheContainer);
243 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
244 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
245 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
247 final String key = buildCacheKey(id, nodeId);
248 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
250 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
253 public Void call() throws Exception {
254 final Optional<TransactionCacheContainer<?>> result =
255 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
256 checkStatId.set(Boolean.valueOf(result.isPresent()));
260 addStatJob(isExpecedStatistics);
265 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
266 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
267 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
269 final RpcJobsQueue addNotification = new RpcJobsQueue() {
272 public Void call() throws Exception {
273 final TransactionId txId = notification.getTransactionId();
274 final String key = buildCacheKey(txId, nodeId);
275 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
276 if (container != null) {
277 container.addNotif(notification);
282 addStatJob(addNotification);
286 public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
287 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
288 final SettableFuture<TransactionId> result = SettableFuture.create();
289 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
292 public Void call() throws Exception {
293 final GetAllGroupStatisticsInputBuilder builder =
294 new GetAllGroupStatisticsInputBuilder();
295 builder.setNode(nodeRef);
296 registrationRpcFutureCallBack(groupStatsService
297 .getAllGroupStatistics(builder.build()), null, nodeRef, result);
301 addGetAllStatJob(getAllGroupStat);
306 public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
307 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
308 final SettableFuture<TransactionId> result = SettableFuture.create();
309 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
312 public Void call() throws Exception {
313 final GetAllMeterStatisticsInputBuilder builder =
314 new GetAllMeterStatisticsInputBuilder();
315 builder.setNode(nodeRef);
316 registrationRpcFutureCallBack(meterStatsService
317 .getAllMeterStatistics(builder.build()), null, nodeRef, result);
321 addGetAllStatJob(getAllMeterStat);
326 public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
327 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
328 final SettableFuture<TransactionId> result = SettableFuture.create();
329 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
332 public Void call() throws Exception {
333 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
334 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
335 builder.setNode(nodeRef);
336 registrationRpcFutureCallBack(flowStatsService
337 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
341 addGetAllStatJob(getAllFlowStat);
346 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
347 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
348 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
349 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
352 public Void call() throws Exception {
353 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
354 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
355 builder.setNode(nodeRef);
356 builder.setTableId(tableId);
358 final TableBuilder tbuilder = new TableBuilder();
359 tbuilder.setId(tableId.getValue());
360 tbuilder.setKey(new TableKey(tableId.getValue()));
361 registrationRpcFutureCallBack(flowStatsService
362 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
366 addGetAllStatJob(getAggregateFlowStat);
370 public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
371 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
372 final SettableFuture<TransactionId> result = SettableFuture.create();
373 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
376 public Void call() throws Exception {
377 final GetAllNodeConnectorsStatisticsInputBuilder builder =
378 new GetAllNodeConnectorsStatisticsInputBuilder();
379 builder.setNode(nodeRef);
380 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
381 portStatsService.getAllNodeConnectorsStatistics(builder.build());
382 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
386 addGetAllStatJob(getAllPortsStat);
391 public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
392 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
393 final SettableFuture<TransactionId> result = SettableFuture.create();
394 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
397 public Void call() throws Exception {
398 final GetFlowTablesStatisticsInputBuilder builder =
399 new GetFlowTablesStatisticsInputBuilder();
400 builder.setNode(nodeRef);
401 registrationRpcFutureCallBack(flowTableStatsService
402 .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
406 addGetAllStatJob(getAllTableStat);
411 public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
412 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
413 final SettableFuture<TransactionId> result = SettableFuture.create();
414 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
417 public Void call() throws Exception {
418 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
419 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
420 builder.setNode(nodeRef);
421 registrationRpcFutureCallBack(queueStatsService
422 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
426 addGetAllStatJob(getAllQueueStat);
431 public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
432 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
433 final SettableFuture<TransactionId> result = SettableFuture.create();
434 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
437 public Void call() throws Exception {
438 final GetAllMeterConfigStatisticsInputBuilder builder =
439 new GetAllMeterConfigStatisticsInputBuilder();
440 builder.setNode(nodeRef);
441 registrationRpcFutureCallBack(meterStatsService
442 .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
446 addGetAllStatJob(qetAllMeterConfStat);
451 public void getGroupFeaturesStat(final NodeRef nodeRef) {
452 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
453 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
456 public Void call() throws Exception {
458 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
459 input.setNode(nodeRef);
460 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
464 addStatJob(getGroupFeaturesStat);
468 public void getMeterFeaturesStat(final NodeRef nodeRef) {
469 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
470 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
473 public Void call() throws Exception {
475 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
476 input.setNode(nodeRef);
477 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
481 addStatJob(getMeterFeaturesStat);
485 public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
486 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
487 final SettableFuture<TransactionId> result = SettableFuture.create();
488 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
491 public Void call() throws Exception {
492 final GetGroupDescriptionInputBuilder builder =
493 new GetGroupDescriptionInputBuilder();
494 builder.setNode(nodeRef);
495 registrationRpcFutureCallBack(groupStatsService
496 .getGroupDescription(builder.build()), null, nodeRef, result);
501 addGetAllStatJob(getAllGropConfStat);
505 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
507 private final TransactionId id;
508 private final NodeId nId;
509 private final List<T> notifications;
510 private final Optional<? extends DataObject> confInput;
512 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
513 this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
514 notifications = new CopyOnWriteArrayList<T>();
515 confInput = Optional.fromNullable(input);
520 public void addNotif(final T notif) {
521 notifications.add(notif);
525 public TransactionId getId() {
530 public NodeId getNodeId() {
535 public List<T> getNotifications() {
536 return notifications;
540 public Optional<? extends DataObject> getConfInput() {