2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import java.math.BigInteger;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
21 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
22 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
23 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import com.google.common.base.Joiner;
57 import com.google.common.base.Optional;
58 import com.google.common.base.Preconditions;
59 import com.google.common.cache.Cache;
60 import com.google.common.cache.CacheBuilder;
61 import com.google.common.util.concurrent.FutureCallback;
62 import com.google.common.util.concurrent.Futures;
63 import com.google.common.util.concurrent.JdkFutureAdapters;
64 import com.google.common.util.concurrent.SettableFuture;
69 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
71 * StatRpcMsgManagerImpl
72 * Class register and provide all RPC Statistics Device Services and implement pre-defined
73 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
75 * In next Class implement process for joining multipart messages.
76 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
77 * One Weak map is used for holding all Multipart Messages and second is used for possible input
78 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
79 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
81 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
84 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
87 private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
89 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
91 private static final int MAX_CACHE_SIZE = 10000;
92 private static final int QUEUE_CAPACITY = 5000;
94 private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
95 private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
96 private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
98 * Number of possible statistic which are waiting for notification
99 * - check it in StatPermCollectorImpl method collectStatCrossNetwork()
101 private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
103 private final OpendaylightGroupStatisticsService groupStatsService;
104 private final OpendaylightMeterStatisticsService meterStatsService;
105 private final OpendaylightFlowStatisticsService flowStatsService;
106 private final OpendaylightPortStatisticsService portStatsService;
107 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
108 private final OpendaylightQueueStatisticsService queueStatsService;
110 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
112 private volatile boolean finishing = false;
114 public StatRpcMsgManagerImpl (final StatisticsManager manager,
115 final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
116 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
117 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
118 groupStatsService = Preconditions.checkNotNull(
119 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
120 "OpendaylightGroupStatisticsService can not be null!");
121 meterStatsService = Preconditions.checkNotNull(
122 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
123 "OpendaylightMeterStatisticsService can not be null!");
124 flowStatsService = Preconditions.checkNotNull(
125 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
126 "OpendaylightFlowStatisticsService can not be null!");
127 portStatsService = Preconditions.checkNotNull(
128 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
129 "OpendaylightPortStatisticsService can not be null!");
130 flowTableStatsService = Preconditions.checkNotNull(
131 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
132 "OpendaylightFlowTableStatisticsService can not be null!");
133 queueStatsService = Preconditions.checkNotNull(
134 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
135 "OpendaylightQueueStatisticsService can not be null!");
137 statsRpcJobQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
138 txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
139 .maximumSize(MAX_CACHE_SIZE).build();
143 public void close() {
145 statsRpcJobQueue = null;
150 /* Neverending cyle - wait for finishing */
151 while ( ! finishing) {
153 statsRpcJobQueue.take().call();
155 catch (final Exception e) {
156 LOG.warn("Stat Element RPC executor fail!", e);
159 // Drain all rpcCall, making sure any blocked threads are unblocked
160 while ( ! statsRpcJobQueue.isEmpty()) {
161 statsRpcJobQueue.poll();
165 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
166 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
168 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
172 private void addStatJob(final RpcJobsQueue getStatJob) {
173 final boolean success = statsRpcJobQueue.offer(getStatJob);
175 LOG.debug("Put RPC request for getStat fail! Queue is full.");
180 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
181 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
182 final SettableFuture<TransactionId> resultTransId) {
184 class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
186 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
187 final TransactionId id = result.getResult().getTransactionId();
188 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
190 String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
191 LOG.warn("Node [{}] does not support statistics request type : {}",
192 nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
193 if (resultTransId != null) {
194 resultTransId.setException(
195 new UnsupportedOperationException());
198 if (resultTransId != null) {
199 resultTransId.set(id);
201 final String cacheKey = buildCacheKey(id, nodeKey.getId());
202 final TransactionCacheContainer<? super TransactionAware> container =
203 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
204 txCache.put(cacheKey, container);
209 public void onFailure(final Throwable t) {
210 LOG.warn("Response Registration for Statistics RPC call fail!", t);
211 if (resultTransId != null) {
212 if (t instanceof DOMRpcImplementationNotAvailableException) {
213 //If encountered with RPC not availabe exception, retry till
214 // stats manager remove the node from the stats collector pool
215 resultTransId.set(StatPermCollectorImpl.getFakeTxId());
217 resultTransId.setException(t);
223 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
226 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
227 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
231 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
232 final TransactionId id, final NodeId nodeId) {
233 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
234 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
236 final String key = buildCacheKey(id, nodeId);
237 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
239 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
242 public Void call() throws Exception {
243 final Optional<TransactionCacheContainer<?>> resultContainer =
244 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
245 if (resultContainer.isPresent()) {
246 txCache.invalidate(key);
248 result.set(resultContainer);
252 addStatJob(getTransactionCacheContainer);
257 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
258 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
259 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
261 final String key = buildCacheKey(id, nodeId);
262 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
264 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
267 public Void call() throws Exception {
268 final Optional<TransactionCacheContainer<?>> result =
269 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
270 checkStatId.set(Boolean.valueOf(result.isPresent()));
274 addStatJob(isExpecedStatistics);
279 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
280 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
281 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
283 final RpcJobsQueue addNotification = new RpcJobsQueue() {
286 public Void call() throws Exception {
287 final TransactionId txId = notification.getTransactionId();
288 final String key = buildCacheKey(txId, nodeId);
289 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
290 if (container != null) {
291 container.addNotif(notification);
296 addStatJob(addNotification);
300 public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
301 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
302 final SettableFuture<TransactionId> result = SettableFuture.create();
303 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
306 public Void call() throws Exception {
307 final GetAllGroupStatisticsInputBuilder builder =
308 new GetAllGroupStatisticsInputBuilder();
309 builder.setNode(nodeRef);
310 registrationRpcFutureCallBack(groupStatsService
311 .getAllGroupStatistics(builder.build()), null, nodeRef, result);
315 addGetAllStatJob(getAllGroupStat);
320 public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
321 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
322 final SettableFuture<TransactionId> result = SettableFuture.create();
323 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
326 public Void call() throws Exception {
327 final GetAllMeterStatisticsInputBuilder builder =
328 new GetAllMeterStatisticsInputBuilder();
329 builder.setNode(nodeRef);
330 registrationRpcFutureCallBack(meterStatsService
331 .getAllMeterStatistics(builder.build()), null, nodeRef, result);
335 addGetAllStatJob(getAllMeterStat);
340 public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
341 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
342 final SettableFuture<TransactionId> result = SettableFuture.create();
343 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
346 public Void call() throws Exception {
347 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
348 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
349 builder.setNode(nodeRef);
350 registrationRpcFutureCallBack(flowStatsService
351 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
355 addGetAllStatJob(getAllFlowStat);
360 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
361 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
362 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
363 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
366 public Void call() throws Exception {
367 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
368 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
369 builder.setNode(nodeRef);
370 builder.setTableId(tableId);
372 final TableBuilder tbuilder = new TableBuilder();
373 tbuilder.setId(tableId.getValue());
374 tbuilder.setKey(new TableKey(tableId.getValue()));
375 registrationRpcFutureCallBack(flowStatsService
376 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
380 addGetAllStatJob(getAggregateFlowStat);
384 public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
385 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
386 final SettableFuture<TransactionId> result = SettableFuture.create();
387 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
390 public Void call() throws Exception {
391 final GetAllNodeConnectorsStatisticsInputBuilder builder =
392 new GetAllNodeConnectorsStatisticsInputBuilder();
393 builder.setNode(nodeRef);
394 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
395 portStatsService.getAllNodeConnectorsStatistics(builder.build());
396 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
400 addGetAllStatJob(getAllPortsStat);
405 public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
406 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
407 final SettableFuture<TransactionId> result = SettableFuture.create();
408 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
411 public Void call() throws Exception {
412 final GetFlowTablesStatisticsInputBuilder builder =
413 new GetFlowTablesStatisticsInputBuilder();
414 builder.setNode(nodeRef);
415 registrationRpcFutureCallBack(flowTableStatsService
416 .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
420 addGetAllStatJob(getAllTableStat);
425 public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
426 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
427 final SettableFuture<TransactionId> result = SettableFuture.create();
428 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
431 public Void call() throws Exception {
432 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
433 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
434 builder.setNode(nodeRef);
435 registrationRpcFutureCallBack(queueStatsService
436 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
440 addGetAllStatJob(getAllQueueStat);
445 public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
446 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
447 final SettableFuture<TransactionId> result = SettableFuture.create();
448 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
451 public Void call() throws Exception {
452 final GetAllMeterConfigStatisticsInputBuilder builder =
453 new GetAllMeterConfigStatisticsInputBuilder();
454 builder.setNode(nodeRef);
455 registrationRpcFutureCallBack(meterStatsService
456 .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
460 addGetAllStatJob(qetAllMeterConfStat);
465 public void getGroupFeaturesStat(final NodeRef nodeRef) {
466 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
467 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
470 public Void call() throws Exception {
472 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
473 input.setNode(nodeRef);
474 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
478 addStatJob(getGroupFeaturesStat);
482 public void getMeterFeaturesStat(final NodeRef nodeRef) {
483 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
484 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
487 public Void call() throws Exception {
489 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
490 input.setNode(nodeRef);
491 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
495 addStatJob(getMeterFeaturesStat);
499 public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
500 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
501 final SettableFuture<TransactionId> result = SettableFuture.create();
502 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
505 public Void call() throws Exception {
506 final GetGroupDescriptionInputBuilder builder =
507 new GetGroupDescriptionInputBuilder();
508 builder.setNode(nodeRef);
509 registrationRpcFutureCallBack(groupStatsService
510 .getGroupDescription(builder.build()), null, nodeRef, result);
515 addGetAllStatJob(getAllGropConfStat);
519 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
521 private final TransactionId id;
522 private final NodeId nId;
523 private final List<T> notifications;
524 private final Optional<? extends DataObject> confInput;
526 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
527 this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
528 notifications = new CopyOnWriteArrayList<T>();
529 confInput = Optional.fromNullable(input);
534 public void addNotif(final T notif) {
535 notifications.add(notif);
539 public TransactionId getId() {
544 public NodeId getNodeId() {
549 public List<T> getNotifications() {
550 return notifications;
554 public Optional<? extends DataObject> getConfInput() {