2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.List;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.CopyOnWriteArrayList;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
19 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
47 import org.opendaylight.yangtools.yang.binding.DataObject;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import com.google.common.base.Optional;
53 import com.google.common.base.Preconditions;
54 import com.google.common.cache.Cache;
55 import com.google.common.cache.CacheBuilder;
56 import com.google.common.util.concurrent.FutureCallback;
57 import com.google.common.util.concurrent.Futures;
58 import com.google.common.util.concurrent.JdkFutureAdapters;
59 import com.google.common.util.concurrent.SettableFuture;
64 * org.opendaylight.controller.md.statistics.manager.impl
66 * StatRpcMsgManagerImpl
67 * Class register and provide all RPC Statistics Device Services and implement pre-defined
68 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
70 * In next Class implement process for joining multipart messages.
71 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
72 * One Weak map is used for holding all Multipart Messages and second is used for possible input
73 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
74 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
76 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
79 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
81 private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
83 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
85 private final long maxLifeForRequest = 50; /* 50 second */
86 private final int queueCapacity = 5000;
88 private final OpendaylightGroupStatisticsService groupStatsService;
89 private final OpendaylightMeterStatisticsService meterStatsService;
90 private final OpendaylightFlowStatisticsService flowStatsService;
91 private final OpendaylightPortStatisticsService portStatsService;
92 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
93 private final OpendaylightQueueStatisticsService queueStatsService;
95 private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
97 private volatile boolean finishing = false;
99 public StatRpcMsgManagerImpl (final StatisticsManager manager,
100 final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
101 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
102 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
103 groupStatsService = Preconditions.checkNotNull(
104 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
105 "OpendaylightGroupStatisticsService can not be null!");
106 meterStatsService = Preconditions.checkNotNull(
107 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
108 "OpendaylightMeterStatisticsService can not be null!");
109 flowStatsService = Preconditions.checkNotNull(
110 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
111 "OpendaylightFlowStatisticsService can not be null!");
112 portStatsService = Preconditions.checkNotNull(
113 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
114 "OpendaylightPortStatisticsService can not be null!");
115 flowTableStatsService = Preconditions.checkNotNull(
116 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
117 "OpendaylightFlowTableStatisticsService can not be null!");
118 queueStatsService = Preconditions.checkNotNull(
119 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
120 "OpendaylightQueueStatisticsService can not be null!");
122 statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
123 txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
124 .maximumSize(10000).build();
128 public void close() {
130 statsRpcJobQueue = null;
135 /* Neverending cyle - wait for finishing */
136 while ( ! finishing) {
138 statsRpcJobQueue.take().call();
140 catch (final Exception e) {
141 LOG.warn("Stat Element RPC executor fail!", e);
144 // Drain all rpcCall, making sure any blocked threads are unblocked
145 while ( ! statsRpcJobQueue.isEmpty()) {
146 statsRpcJobQueue.poll();
150 private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
151 final boolean success = statsRpcJobQueue.offer(getAllStatJob);
153 LOG.warn("Put RPC request getAllStat fail! Queue is full.");
157 private void addStatJob(final RpcJobsQueue getStatJob) {
158 final boolean success = statsRpcJobQueue.offer(getStatJob);
160 LOG.debug("Put RPC request for getStat fail! Queue is full.");
165 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
166 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
168 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
169 new FutureCallback<RpcResult<? extends TransactionAware>>() {
172 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
173 final TransactionId id = result.getResult().getTransactionId();
175 LOG.warn("No protocol support");
177 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
178 final String cacheKey = buildCacheKey(id, nodeKey.getId());
179 final TransactionCacheContainer<? super TransactionAware> container =
180 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
181 txCache.put(cacheKey, container);
186 public void onFailure(final Throwable t) {
187 LOG.warn("Response Registration for Statistics RPC call fail!", t);
193 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
194 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
198 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
199 final TransactionId id, final NodeId nodeId) {
200 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
201 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
203 final String key = buildCacheKey(id, nodeId);
204 final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
206 final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
209 public Void call() throws Exception {
210 final Optional<TransactionCacheContainer<?>> resultContainer =
211 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
212 if (resultContainer.isPresent()) {
213 txCache.invalidate(key);
215 result.set(resultContainer);
219 addStatJob(getTransactionCacheContainer);
224 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
225 Preconditions.checkArgument(id != null, "TransactionId can not be null!");
226 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
228 final String key = buildCacheKey(id, nodeId);
229 final SettableFuture<Boolean> checkStatId = SettableFuture.create();
231 final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
234 public Void call() throws Exception {
235 final Optional<TransactionCacheContainer<?>> result =
236 Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
237 checkStatId.set(Boolean.valueOf(result.isPresent()));
241 addStatJob(isExpecedStatistics);
246 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
247 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
248 Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
250 final RpcJobsQueue addNotification = new RpcJobsQueue() {
253 public Void call() throws Exception {
254 final TransactionId txId = notification.getTransactionId();
255 final String key = buildCacheKey(txId, nodeId);
256 final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
257 if (container != null) {
258 container.addNotif(notification);
263 addStatJob(addNotification);
267 public void getAllGroupsStat(final NodeRef nodeRef) {
268 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
269 final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
272 public Void call() throws Exception {
273 final GetAllGroupStatisticsInputBuilder builder =
274 new GetAllGroupStatisticsInputBuilder();
275 builder.setNode(nodeRef);
276 registrationRpcFutureCallBack(groupStatsService
277 .getAllGroupStatistics(builder.build()), null, nodeRef);
281 addGetAllStatJob(getAllGroupStat);
285 public void getAllMetersStat(final NodeRef nodeRef) {
286 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
287 final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
290 public Void call() throws Exception {
291 final GetAllMeterStatisticsInputBuilder builder =
292 new GetAllMeterStatisticsInputBuilder();
293 builder.setNode(nodeRef);
294 registrationRpcFutureCallBack(meterStatsService
295 .getAllMeterStatistics(builder.build()), null, nodeRef);
299 addGetAllStatJob(getAllMeterStat);
303 public void getAllFlowsStat(final NodeRef nodeRef) {
304 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
305 final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
308 public Void call() throws Exception {
309 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
310 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
311 builder.setNode(nodeRef);
312 registrationRpcFutureCallBack(flowStatsService
313 .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
317 addGetAllStatJob(getAllFlowStat);
321 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
322 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
323 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
324 final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
327 public Void call() throws Exception {
328 final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
329 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
330 builder.setNode(nodeRef);
331 builder.setTableId(tableId);
333 final TableBuilder tbuilder = new TableBuilder();
334 tbuilder.setId(tableId.getValue());
335 tbuilder.setKey(new TableKey(tableId.getValue()));
336 registrationRpcFutureCallBack(flowStatsService
337 .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
341 addGetAllStatJob(getAggregateFlowStat);
345 public void getAllPortsStat(final NodeRef nodeRef) {
346 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
347 final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
350 public Void call() throws Exception {
351 final GetAllNodeConnectorsStatisticsInputBuilder builder =
352 new GetAllNodeConnectorsStatisticsInputBuilder();
353 builder.setNode(nodeRef);
354 registrationRpcFutureCallBack(portStatsService
355 .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
359 addGetAllStatJob(getAllPortsStat);
363 public void getAllTablesStat(final NodeRef nodeRef) {
364 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
365 final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
368 public Void call() throws Exception {
369 final GetFlowTablesStatisticsInputBuilder builder =
370 new GetFlowTablesStatisticsInputBuilder();
371 builder.setNode(nodeRef);
372 registrationRpcFutureCallBack(flowTableStatsService
373 .getFlowTablesStatistics(builder.build()), null, nodeRef);
377 addGetAllStatJob(getAllTableStat);
381 public void getAllQueueStat(final NodeRef nodeRef) {
382 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
383 final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
386 public Void call() throws Exception {
387 final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
388 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
389 builder.setNode(nodeRef);
390 registrationRpcFutureCallBack(queueStatsService
391 .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
395 addGetAllStatJob(getAllQueueStat);
399 public void getAllMeterConfigStat(final NodeRef nodeRef) {
400 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
401 final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
404 public Void call() throws Exception {
405 final GetAllMeterConfigStatisticsInputBuilder builder =
406 new GetAllMeterConfigStatisticsInputBuilder();
407 builder.setNode(nodeRef);
408 registrationRpcFutureCallBack(meterStatsService
409 .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
413 addGetAllStatJob(qetAllMeterConfStat);
417 public void getGroupFeaturesStat(final NodeRef nodeRef) {
418 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
419 final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
422 public Void call() throws Exception {
424 final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
425 input.setNode(nodeRef);
426 registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
430 addStatJob(getGroupFeaturesStat);
434 public void getMeterFeaturesStat(final NodeRef nodeRef) {
435 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
436 final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
439 public Void call() throws Exception {
441 final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
442 input.setNode(nodeRef);
443 registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
447 addStatJob(getMeterFeaturesStat);
451 public void getAllGroupsConfStats(final NodeRef nodeRef) {
452 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
453 final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
456 public Void call() throws Exception {
457 Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
458 final GetGroupDescriptionInputBuilder builder =
459 new GetGroupDescriptionInputBuilder();
460 builder.setNode(nodeRef);
461 registrationRpcFutureCallBack(groupStatsService
462 .getGroupDescription(builder.build()), null, nodeRef);
467 addGetAllStatJob(getAllGropConfStat);
470 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
472 private final TransactionId id;
473 private final NodeId nId;
474 private final List<T> notifications;
475 private final Optional<? extends DataObject> confInput;
477 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
478 this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
479 notifications = new CopyOnWriteArrayList<T>();
480 confInput = Optional.fromNullable(input);
485 public void addNotif(final T notif) {
486 notifications.add(notif);
490 public TransactionId getId() {
495 public NodeId getNodeId() {
500 public List<T> getNotifications() {
501 return notifications;
505 public Optional<? extends DataObject> getConfInput() {