2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
20 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
21 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
67 * org.opendaylight.controller.md.statistics.manager.impl
69 * StatRpcMsgManagerImpl
70 * Class register and provide all RPC Statistics Device Services and implement pre-defined
71 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
73 * In next Class implement process for joining multipart messages.
74 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75 * One Weak map is used for holding all Multipart Messages and second is used for possible input
76 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
79 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
84 private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
86 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
88 private final int queueCapacity = 5000;
90 private final OpendaylightGroupStatisticsService groupStatsService;
91 private final OpendaylightMeterStatisticsService meterStatsService;
92 private final OpendaylightFlowStatisticsService flowStatsService;
93 private final OpendaylightPortStatisticsService portStatsService;
94 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
95 private final OpendaylightQueueStatisticsService queueStatsService;
97 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
99 private volatile boolean finishing = false;
101 public StatRpcMsgManagerImpl (final StatisticsManager manager,
102 final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
103 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
104 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
105 groupStatsService = Preconditions.checkNotNull(
106 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
107 "OpendaylightGroupStatisticsService can not be null!");
108 meterStatsService = Preconditions.checkNotNull(
109 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
110 "OpendaylightMeterStatisticsService can not be null!");
111 flowStatsService = Preconditions.checkNotNull(
112 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
113 "OpendaylightFlowStatisticsService can not be null!");
114 portStatsService = Preconditions.checkNotNull(
115 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
116 "OpendaylightPortStatisticsService can not be null!");
117 flowTableStatsService = Preconditions.checkNotNull(
118 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
119 "OpendaylightFlowTableStatisticsService can not be null!");
120 queueStatsService = Preconditions.checkNotNull(
121 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
122 "OpendaylightQueueStatisticsService can not be null!");
124 statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
125 /* nr. 7 is here nr. of possible statistic which are waiting for notification
126 * - check it in StatPermCollectorImpl method collectStatCrossNetwork */
127 txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
128 .maximumSize(10000).build();
132 public void close() {
134 statsRpcJobQueue = null;
139 /* Neverending cyle - wait for finishing */
140 while ( ! finishing) {
142 statsRpcJobQueue.take().call();
144 catch (final Exception e) {
145 LOG.warn("Stat Element RPC executor fail!", e);
148 // Drain all rpcCall, making sure any blocked threads are unblocked
149 while ( ! statsRpcJobQueue.isEmpty()) {
150 statsRpcJobQueue.poll();
154 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
155 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
157 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
161 private void addStatJob(final RpcJobsQueue getStatJob) {
162 final boolean success = statsRpcJobQueue.offer(getStatJob);
164 LOG.debug("Put RPC request for getStat fail! Queue is full.");
169 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
170 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
171 final SettableFuture<TransactionId> resultTransId) {
173 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
174 new FutureCallback<RpcResult<? extends TransactionAware>>() {
177 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
178 final TransactionId id = result.getResult().getTransactionId();
179 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
181 String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
182 LOG.warn("Node [{}] does not support statistics request type : {}",
183 nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
185 if (resultTransId != null) {
186 resultTransId.set(id);
188 final String cacheKey = buildCacheKey(id, nodeKey.getId());
189 final TransactionCacheContainer<? super TransactionAware> container =
190 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
191 txCache.put(cacheKey, container);
196 public void onFailure(final Throwable t) {
197 LOG.warn("Response Registration for Statistics RPC call fail!", t);
203 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
204 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
208 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
209 final TransactionId id, final NodeId nodeId) {
210 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
211 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
213 final String key = buildCacheKey(id, nodeId);
214 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
216 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
219 public Void call() throws Exception {
220 final Optional<TransactionCacheContainer<?>> resultContainer =
221 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
222 if (resultContainer.isPresent()) {
223 txCache.invalidate(key);
225 result.set(resultContainer);
229 addStatJob(getTransactionCacheContainer);
234 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
235 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
236 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
238 final String key = buildCacheKey(id, nodeId);
239 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
241 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
244 public Void call() throws Exception {
245 final Optional<TransactionCacheContainer<?>> result =
246 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
247 checkStatId.set(Boolean.valueOf(result.isPresent()));
251 addStatJob(isExpecedStatistics);
256 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
257 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
258 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
260 final RpcJobsQueue addNotification = new RpcJobsQueue() {
263 public Void call() throws Exception {
264 final TransactionId txId = notification.getTransactionId();
265 final String key = buildCacheKey(txId, nodeId);
266 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
267 if (container != null) {
268 container.addNotif(notification);
273 addStatJob(addNotification);
277 public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
278 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
279 final SettableFuture<TransactionId> result = SettableFuture.create();
280 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
283 public Void call() throws Exception {
284 final GetAllGroupStatisticsInputBuilder builder =
285 new GetAllGroupStatisticsInputBuilder();
286 builder.setNode(nodeRef);
287 registrationRpcFutureCallBack(groupStatsService
288 .getAllGroupStatistics(builder.build()), null, nodeRef, result);
292 addGetAllStatJob(getAllGroupStat);
297 public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
298 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
299 final SettableFuture<TransactionId> result = SettableFuture.create();
300 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
303 public Void call() throws Exception {
304 final GetAllMeterStatisticsInputBuilder builder =
305 new GetAllMeterStatisticsInputBuilder();
306 builder.setNode(nodeRef);
307 registrationRpcFutureCallBack(meterStatsService
308 .getAllMeterStatistics(builder.build()), null, nodeRef, result);
312 addGetAllStatJob(getAllMeterStat);
317 public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
318 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
319 final SettableFuture<TransactionId> result = SettableFuture.create();
320 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
323 public Void call() throws Exception {
324 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
325 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
326 builder.setNode(nodeRef);
327 registrationRpcFutureCallBack(flowStatsService
328 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
332 addGetAllStatJob(getAllFlowStat);
337 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
338 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
339 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
340 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
343 public Void call() throws Exception {
344 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
345 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
346 builder.setNode(nodeRef);
347 builder.setTableId(tableId);
349 final TableBuilder tbuilder = new TableBuilder();
350 tbuilder.setId(tableId.getValue());
351 tbuilder.setKey(new TableKey(tableId.getValue()));
352 registrationRpcFutureCallBack(flowStatsService
353 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
357 addGetAllStatJob(getAggregateFlowStat);
361 public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
362 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
363 final SettableFuture<TransactionId> result = SettableFuture.create();
364 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
367 public Void call() throws Exception {
368 final GetAllNodeConnectorsStatisticsInputBuilder builder =
369 new GetAllNodeConnectorsStatisticsInputBuilder();
370 builder.setNode(nodeRef);
371 final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
372 portStatsService.getAllNodeConnectorsStatistics(builder.build());
373 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
377 addGetAllStatJob(getAllPortsStat);
382 public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
383 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
384 final SettableFuture<TransactionId> result = SettableFuture.create();
385 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
388 public Void call() throws Exception {
389 final GetFlowTablesStatisticsInputBuilder builder =
390 new GetFlowTablesStatisticsInputBuilder();
391 builder.setNode(nodeRef);
392 registrationRpcFutureCallBack(flowTableStatsService
393 .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
397 addGetAllStatJob(getAllTableStat);
402 public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
403 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
404 final SettableFuture<TransactionId> result = SettableFuture.create();
405 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
408 public Void call() throws Exception {
409 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
410 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
411 builder.setNode(nodeRef);
412 registrationRpcFutureCallBack(queueStatsService
413 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
417 addGetAllStatJob(getAllQueueStat);
422 public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
423 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
424 final SettableFuture<TransactionId> result = SettableFuture.create();
425 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
428 public Void call() throws Exception {
429 final GetAllMeterConfigStatisticsInputBuilder builder =
430 new GetAllMeterConfigStatisticsInputBuilder();
431 builder.setNode(nodeRef);
432 registrationRpcFutureCallBack(meterStatsService
433 .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
437 addGetAllStatJob(qetAllMeterConfStat);
442 public void getGroupFeaturesStat(final NodeRef nodeRef) {
443 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
444 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
447 public Void call() throws Exception {
449 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
450 input.setNode(nodeRef);
451 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
455 addStatJob(getGroupFeaturesStat);
459 public void getMeterFeaturesStat(final NodeRef nodeRef) {
460 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
461 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
464 public Void call() throws Exception {
466 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
467 input.setNode(nodeRef);
468 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
472 addStatJob(getMeterFeaturesStat);
476 public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
477 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
478 final SettableFuture<TransactionId> result = SettableFuture.create();
479 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
482 public Void call() throws Exception {
483 final GetGroupDescriptionInputBuilder builder =
484 new GetGroupDescriptionInputBuilder();
485 builder.setNode(nodeRef);
486 registrationRpcFutureCallBack(groupStatsService
487 .getGroupDescription(builder.build()), null, nodeRef, result);
492 addGetAllStatJob(getAllGropConfStat);
496 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
498 private final TransactionId id;
499 private final NodeId nId;
500 private final List<T> notifications;
501 private final Optional<? extends DataObject> confInput;
503 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
504 this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
505 notifications = new CopyOnWriteArrayList<T>();
506 confInput = Optional.fromNullable(input);
511 public void addNotif(final T notif) {
512 notifications.add(notif);
516 public TransactionId getId() {
521 public NodeId getNodeId() {
526 public List<T> getNotifications() {
527 return notifications;
531 public Optional<? extends DataObject> getConfInput() {