2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import java.math.BigInteger;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
19 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
20 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
21 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
67 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
69 * StatRpcMsgManagerImpl
70 * Class register and provide all RPC Statistics Device Services and implement pre-defined
71 * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
73 * In next Class implement process for joining multipart messages.
74 * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75 * One Weak map is used for holding all Multipart Messages and second is used for possible input
76 * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77 * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
79 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
85 private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
87 private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
90 * Cache for futures to be returned by
91 * {@link #isExpectedStatistics(TransactionId, NodeId)}.
93 private final Cache<String, SettableFuture<Boolean>> txFutureCache;
96 * The number of seconds to wait for transaction container to be put into
99 private static final long TXCACHE_WAIT_TIMEOUT = 10L;
101 private static final int MAX_CACHE_SIZE = 10000;
103 private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
104 private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
105 private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
107 * Number of possible statistic which are waiting for notification
108 * - check it in StatPermCollectorImpl method collectStatCrossNetwork()
110 private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
112 private final OpendaylightGroupStatisticsService groupStatsService;
113 private final OpendaylightMeterStatisticsService meterStatsService;
114 private final OpendaylightFlowStatisticsService flowStatsService;
115 private final OpendaylightPortStatisticsService portStatsService;
116 private final OpendaylightFlowTableStatisticsService flowTableStatsService;
117 private final OpendaylightQueueStatisticsService queueStatsService;
119 public StatRpcMsgManagerImpl (final StatisticsManager manager,
120 final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
121 Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
122 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
123 groupStatsService = Preconditions.checkNotNull(
124 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
125 "OpendaylightGroupStatisticsService can not be null!");
126 meterStatsService = Preconditions.checkNotNull(
127 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
128 "OpendaylightMeterStatisticsService can not be null!");
129 flowStatsService = Preconditions.checkNotNull(
130 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
131 "OpendaylightFlowStatisticsService can not be null!");
132 portStatsService = Preconditions.checkNotNull(
133 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
134 "OpendaylightPortStatisticsService can not be null!");
135 flowTableStatsService = Preconditions.checkNotNull(
136 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
137 "OpendaylightFlowTableStatisticsService can not be null!");
138 queueStatsService = Preconditions.checkNotNull(
139 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
140 "OpendaylightQueueStatisticsService can not be null!");
142 txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
143 .maximumSize(MAX_CACHE_SIZE).build();
144 txFutureCache = CacheBuilder.newBuilder().
145 expireAfterWrite(TXCACHE_WAIT_TIMEOUT, TimeUnit.SECONDS).
146 maximumSize(MAX_CACHE_SIZE).build();
150 public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
151 final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
152 final SettableFuture<TransactionId> resultTransId) {
154 class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
156 public void onSuccess(final RpcResult<? extends TransactionAware> result) {
157 final TransactionId id = result.getResult().getTransactionId();
158 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
160 String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
161 LOG.warn("Node [{}] does not support statistics request type : {}",
162 nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
163 if (resultTransId != null) {
164 resultTransId.setException(
165 new UnsupportedOperationException());
168 if (resultTransId != null) {
169 resultTransId.set(id);
171 final String cacheKey = buildCacheKey(id, nodeKey.getId());
172 final TransactionCacheContainer<? super TransactionAware> container =
173 new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
174 putTransaction(cacheKey, container);
179 public void onFailure(final Throwable t) {
180 LOG.warn("Response Registration for Statistics RPC call fail!", t);
181 if (resultTransId != null) {
182 if (t instanceof DOMRpcImplementationNotAvailableException) {
183 //If encountered with RPC not availabe exception, retry till
184 // stats manager remove the node from the stats collector pool
185 resultTransId.set(StatPermCollectorImpl.getFakeTxId());
187 resultTransId.setException(t);
193 Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
196 private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
197 return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
201 * Put the given statistics transaction container into the cache.
203 * @param key Key that specifies the given transaction container.
204 * @param container Transaction container.
206 private synchronized void putTransaction(
207 String key, TransactionCacheContainer<? super TransactionAware> container) {
208 txCache.put(key, container);
210 SettableFuture<Boolean> future = txFutureCache.asMap().remove(key);
211 if (future != null) {
212 // Wake up a thread waiting for this transaction container.
218 * Check to see if the specified transaction container is cached in
221 * @param key Key that specifies the transaction container.
222 * @return A future that will contain the result.
224 private synchronized Future<Boolean> isExpectedStatistics(String key) {
225 Future<Boolean> future;
226 TransactionCacheContainer<?> container = txCache.getIfPresent(key);
227 if (container == null) {
228 // Wait for the transaction container to be put into the cache.
229 SettableFuture<Boolean> f = SettableFuture.<Boolean>create();
230 SettableFuture<Boolean> current =
231 txFutureCache.asMap().putIfAbsent(key, f);
232 future = (current == null) ? f : current;
234 future = Futures.immediateFuture(Boolean.TRUE);
241 public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
242 final TransactionId id, final NodeId nodeId) {
243 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
244 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
246 String key = buildCacheKey(id, nodeId);
247 Optional<TransactionCacheContainer<?>> resultContainer =
248 Optional.<TransactionCacheContainer<?>> fromNullable(
249 txCache.asMap().remove(key));
250 if (!resultContainer.isPresent()) {
251 LOG.warn("Transaction cache not found: {}", key);
254 return Futures.immediateFuture(resultContainer);
258 public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
259 Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
260 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
262 String key = buildCacheKey(id, nodeId);
263 return isExpectedStatistics(key);
267 public void addNotification(final TransactionAware notification, final NodeId nodeId) {
268 Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
269 Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
271 TransactionId txId = notification.getTransactionId();
272 String key = buildCacheKey(txId, nodeId);
273 TransactionCacheContainer<? super TransactionAware> container =
274 txCache.getIfPresent(key);
275 if (container != null) {
276 container.addNotif(notification);
278 LOG.warn("Unable to add notification: {}, {}", key,
279 notification.getImplementedInterface());
284 public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
285 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
286 SettableFuture<TransactionId> result = SettableFuture.create();
287 GetAllGroupStatisticsInputBuilder builder =
288 new GetAllGroupStatisticsInputBuilder();
289 builder.setNode(nodeRef);
290 registrationRpcFutureCallBack(
291 groupStatsService.getAllGroupStatistics(builder.build()), null,
297 public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
298 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
299 SettableFuture<TransactionId> result = SettableFuture.create();
300 GetAllMeterStatisticsInputBuilder builder =
301 new GetAllMeterStatisticsInputBuilder();
302 builder.setNode(nodeRef);
303 registrationRpcFutureCallBack(
304 meterStatsService.getAllMeterStatistics(builder.build()), null,
310 public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
311 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
312 SettableFuture<TransactionId> result = SettableFuture.create();
313 GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
314 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
315 builder.setNode(nodeRef);
316 registrationRpcFutureCallBack(
317 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(builder.build()),
318 null, nodeRef, result);
323 public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
324 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
325 Preconditions.checkArgument(tableId != null, "TableId can not be null!");
326 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
327 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
328 builder.setNode(nodeRef).setTableId(tableId);
330 TableBuilder tbuilder = new TableBuilder().
331 setId(tableId.getValue()).
332 setKey(new TableKey(tableId.getValue()));
333 registrationRpcFutureCallBack(
334 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()),
335 tbuilder.build(), nodeRef, null);
339 public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
340 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
341 SettableFuture<TransactionId> result = SettableFuture.create();
342 GetAllNodeConnectorsStatisticsInputBuilder builder =
343 new GetAllNodeConnectorsStatisticsInputBuilder();
344 builder.setNode(nodeRef);
345 Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
346 portStatsService.getAllNodeConnectorsStatistics(builder.build());
347 registrationRpcFutureCallBack(rpc, null, nodeRef, result);
352 public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
353 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
354 SettableFuture<TransactionId> result = SettableFuture.create();
355 GetFlowTablesStatisticsInputBuilder builder =
356 new GetFlowTablesStatisticsInputBuilder();
357 builder.setNode(nodeRef);
358 registrationRpcFutureCallBack(
359 flowTableStatsService.getFlowTablesStatistics(builder.build()),
360 null, nodeRef, result);
365 public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
366 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
367 SettableFuture<TransactionId> result = SettableFuture.create();
368 GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
369 new GetAllQueuesStatisticsFromAllPortsInputBuilder();
370 builder.setNode(nodeRef);
371 registrationRpcFutureCallBack(
372 queueStatsService.getAllQueuesStatisticsFromAllPorts(builder.build()),
373 null, nodeRef, result);
378 public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
379 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
380 SettableFuture<TransactionId> result = SettableFuture.create();
381 GetAllMeterConfigStatisticsInputBuilder builder =
382 new GetAllMeterConfigStatisticsInputBuilder();
383 builder.setNode(nodeRef);
384 registrationRpcFutureCallBack(
385 meterStatsService.getAllMeterConfigStatistics(builder.build()),
386 null, nodeRef, result);
391 public void getGroupFeaturesStat(final NodeRef nodeRef) {
392 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
393 GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder().
395 registrationRpcFutureCallBack(
396 groupStatsService.getGroupFeatures(input.build()), null, nodeRef,
401 public void getMeterFeaturesStat(final NodeRef nodeRef) {
402 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
403 GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder().
405 registrationRpcFutureCallBack(
406 meterStatsService.getMeterFeatures(input.build()), null, nodeRef,
411 public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
412 Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
413 SettableFuture<TransactionId> result = SettableFuture.create();
414 GetGroupDescriptionInputBuilder builder =
415 new GetGroupDescriptionInputBuilder();
416 builder.setNode(nodeRef);
417 registrationRpcFutureCallBack(
418 groupStatsService.getGroupDescription(builder.build()), null,
423 public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
425 private final TransactionId id;
426 private final NodeId nId;
427 private final List<T> notifications;
428 private final Optional<? extends DataObject> confInput;
430 public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
431 this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
432 notifications = new CopyOnWriteArrayList<T>();
433 confInput = Optional.fromNullable(input);
438 public void addNotif(final T notif) {
439 notifications.add(notif);
443 public TransactionId getId() {
448 public NodeId getNodeId() {
453 public List<T> getNotifications() {
454 return notifications;
458 public Optional<? extends DataObject> getConfInput() {