Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatRpcMsgManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import java.math.BigInteger;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.CopyOnWriteArrayList;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
17
18 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
19 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
20 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
21 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Joiner;
55 import com.google.common.base.Optional;
56 import com.google.common.base.Preconditions;
57 import com.google.common.cache.Cache;
58 import com.google.common.cache.CacheBuilder;
59 import com.google.common.util.concurrent.FutureCallback;
60 import com.google.common.util.concurrent.Futures;
61 import com.google.common.util.concurrent.JdkFutureAdapters;
62 import com.google.common.util.concurrent.SettableFuture;
63
64
65 /**
66  * statistics-manager
67  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
68  *
69  * StatRpcMsgManagerImpl
70  * Class register and provide all RPC Statistics Device Services and implement pre-defined
71  * wrapped methods for prepare easy access to RPC Statistics Device Services like getAllStatisticsFor...
72  *
73  * In next Class implement process for joining multipart messages.
74  * Class internally use two WeakHashMap and GuavaCache for holding values for joining multipart msg.
75  * One Weak map is used for holding all Multipart Messages and second is used for possible input
76  * Config/DS light-weight DataObject (DataObject contains only necessary identification fields as
77  * TableId, GroupId, MeterId or for flow Match, Priority, FlowCookie, TableId and FlowId ...
78  *
79  * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
80  *
81  */
82 public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
83
84
85     private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
86
87     private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
88
89     /**
90      * Cache for futures to be returned by
91      * {@link #isExpectedStatistics(TransactionId, NodeId)}.
92      */
93     private final Cache<String, SettableFuture<Boolean>>  txFutureCache;
94
95     /**
96      * The number of seconds to wait for transaction container to be put into
97      * {@link #txCache}.
98      */
99     private static final long TXCACHE_WAIT_TIMEOUT = 10L;
100
101     private static final int MAX_CACHE_SIZE = 10000;
102
103     private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
104     private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
105     private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
106     /**
107      *  Number of possible statistic which are waiting for notification
108      *      - check it in StatPermCollectorImpl method collectStatCrossNetwork()
109      */
110     private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
111
112     private final OpendaylightGroupStatisticsService groupStatsService;
113     private final OpendaylightMeterStatisticsService meterStatsService;
114     private final OpendaylightFlowStatisticsService flowStatsService;
115     private final OpendaylightPortStatisticsService portStatsService;
116     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
117     private final OpendaylightQueueStatisticsService queueStatsService;
118
119     public StatRpcMsgManagerImpl (final StatisticsManager manager,
120             final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
121         Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
122         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
123         groupStatsService = Preconditions.checkNotNull(
124                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
125                 "OpendaylightGroupStatisticsService can not be null!");
126         meterStatsService = Preconditions.checkNotNull(
127                 rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class),
128                 "OpendaylightMeterStatisticsService can not be null!");
129         flowStatsService = Preconditions.checkNotNull(
130                 rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class),
131                 "OpendaylightFlowStatisticsService can not be null!");
132         portStatsService = Preconditions.checkNotNull(
133                 rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class),
134                 "OpendaylightPortStatisticsService can not be null!");
135         flowTableStatsService = Preconditions.checkNotNull(
136                 rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class),
137                 "OpendaylightFlowTableStatisticsService can not be null!");
138         queueStatsService = Preconditions.checkNotNull(
139                 rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
140                 "OpendaylightQueueStatisticsService can not be null!");
141
142         txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
143                 .maximumSize(MAX_CACHE_SIZE).build();
144         txFutureCache = CacheBuilder.newBuilder().
145             expireAfterWrite(TXCACHE_WAIT_TIMEOUT, TimeUnit.SECONDS).
146             maximumSize(MAX_CACHE_SIZE).build();
147     }
148
149     @Override
150     public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
151             final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
152             final SettableFuture<TransactionId> resultTransId) {
153
154         class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
155             @Override
156             public void onSuccess(final RpcResult<? extends TransactionAware> result) {
157                 final TransactionId id = result.getResult().getTransactionId();
158                 final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
159                 if (id == null) {
160                     String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
161                     LOG.warn("Node [{}] does not support statistics request type : {}",
162                             nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
163                     if (resultTransId != null) {
164                         resultTransId.setException(
165                             new UnsupportedOperationException());
166                     }
167                 } else {
168                     if (resultTransId != null) {
169                         resultTransId.set(id);
170                     }
171                     final String cacheKey = buildCacheKey(id, nodeKey.getId());
172                     final TransactionCacheContainer<? super TransactionAware> container =
173                             new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
174                     putTransaction(cacheKey, container);
175                 }
176             }
177
178             @Override
179             public void onFailure(final Throwable t) {
180                 LOG.warn("Response Registration for Statistics RPC call fail!", t);
181                 if (resultTransId != null) {
182                     if (t instanceof DOMRpcImplementationNotAvailableException) {
183                         //If encountered with RPC not availabe exception, retry till
184                         // stats manager remove the node from the stats collector pool
185                         resultTransId.set(StatPermCollectorImpl.getFakeTxId());
186                     } else {
187                         resultTransId.setException(t);
188                     }
189                 }
190             }
191         }
192
193         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
194     }
195
196     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
197         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
198     }
199
200     /**
201      * Put the given statistics transaction container into the cache.
202      *
203      * @param key        Key that specifies the given transaction container.
204      * @param container  Transaction container.
205      */
206     private synchronized void putTransaction(
207         String key, TransactionCacheContainer<? super TransactionAware> container) {
208         txCache.put(key, container);
209
210         SettableFuture<Boolean> future = txFutureCache.asMap().remove(key);
211         if (future != null) {
212             // Wake up a thread waiting for this transaction container.
213             future.set(true);
214         }
215     }
216
217     /**
218      * Check to see if the specified transaction container is cached in
219      * {@link #txCache}.
220      *
221      * @param key  Key that specifies the transaction container.
222      * @return  A future that will contain the result.
223      */
224     private synchronized Future<Boolean> isExpectedStatistics(String key) {
225         Future<Boolean> future;
226         TransactionCacheContainer<?> container = txCache.getIfPresent(key);
227         if (container == null) {
228             // Wait for the transaction container to be put into the cache.
229             SettableFuture<Boolean> f = SettableFuture.<Boolean>create();
230             SettableFuture<Boolean> current =
231                 txFutureCache.asMap().putIfAbsent(key, f);
232             future = (current == null) ? f : current;
233         } else {
234             future = Futures.immediateFuture(Boolean.TRUE);
235         }
236
237         return future;
238     }
239
240     @Override
241     public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
242             final TransactionId id, final NodeId nodeId) {
243         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
244         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
245
246         String key = buildCacheKey(id, nodeId);
247         Optional<TransactionCacheContainer<?>> resultContainer =
248             Optional.<TransactionCacheContainer<?>> fromNullable(
249                 txCache.asMap().remove(key));
250         if (!resultContainer.isPresent()) {
251             LOG.warn("Transaction cache not found: {}", key);
252         }
253
254         return Futures.immediateFuture(resultContainer);
255     }
256
257     @Override
258     public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
259         Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
260         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
261
262         String key = buildCacheKey(id, nodeId);
263         return isExpectedStatistics(key);
264     }
265
266     @Override
267     public void addNotification(final TransactionAware notification, final NodeId nodeId) {
268         Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
269         Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
270
271         TransactionId txId = notification.getTransactionId();
272         String key = buildCacheKey(txId, nodeId);
273         TransactionCacheContainer<? super TransactionAware> container =
274             txCache.getIfPresent(key);
275         if (container != null) {
276             container.addNotif(notification);
277         } else {
278             LOG.warn("Unable to add notification: {}, {}", key,
279                      notification.getImplementedInterface());
280         }
281     }
282
283     @Override
284     public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
285         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
286         SettableFuture<TransactionId> result = SettableFuture.create();
287         GetAllGroupStatisticsInputBuilder builder =
288             new GetAllGroupStatisticsInputBuilder();
289         builder.setNode(nodeRef);
290         registrationRpcFutureCallBack(
291             groupStatsService.getAllGroupStatistics(builder.build()), null,
292             nodeRef, result);
293         return result;
294     }
295
296     @Override
297     public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
298         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
299         SettableFuture<TransactionId> result = SettableFuture.create();
300         GetAllMeterStatisticsInputBuilder builder =
301             new GetAllMeterStatisticsInputBuilder();
302         builder.setNode(nodeRef);
303         registrationRpcFutureCallBack(
304             meterStatsService.getAllMeterStatistics(builder.build()), null,
305             nodeRef, result);
306         return result;
307     }
308
309     @Override
310     public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
311         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
312         SettableFuture<TransactionId> result = SettableFuture.create();
313         GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
314             new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
315         builder.setNode(nodeRef);
316         registrationRpcFutureCallBack(
317             flowStatsService.getAllFlowsStatisticsFromAllFlowTables(builder.build()),
318             null, nodeRef, result);
319         return result;
320     }
321
322     @Override
323     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
324         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
325         Preconditions.checkArgument(tableId != null, "TableId can not be null!");
326         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
327             new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
328         builder.setNode(nodeRef).setTableId(tableId);
329
330         TableBuilder tbuilder = new TableBuilder().
331             setId(tableId.getValue()).
332             setKey(new TableKey(tableId.getValue()));
333         registrationRpcFutureCallBack(
334             flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()),
335             tbuilder.build(), nodeRef, null);
336     }
337
338     @Override
339     public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
340         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
341         SettableFuture<TransactionId> result = SettableFuture.create();
342         GetAllNodeConnectorsStatisticsInputBuilder builder =
343             new GetAllNodeConnectorsStatisticsInputBuilder();
344         builder.setNode(nodeRef);
345         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
346             portStatsService.getAllNodeConnectorsStatistics(builder.build());
347         registrationRpcFutureCallBack(rpc, null, nodeRef, result);
348         return result;
349     }
350
351     @Override
352     public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
353         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
354         SettableFuture<TransactionId> result = SettableFuture.create();
355         GetFlowTablesStatisticsInputBuilder builder =
356             new GetFlowTablesStatisticsInputBuilder();
357         builder.setNode(nodeRef);
358         registrationRpcFutureCallBack(
359             flowTableStatsService.getFlowTablesStatistics(builder.build()),
360             null, nodeRef, result);
361         return result;
362     }
363
364     @Override
365     public Future<TransactionId>  getAllQueueStat(final NodeRef nodeRef) {
366         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
367         SettableFuture<TransactionId> result = SettableFuture.create();
368         GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
369             new GetAllQueuesStatisticsFromAllPortsInputBuilder();
370         builder.setNode(nodeRef);
371         registrationRpcFutureCallBack(
372             queueStatsService.getAllQueuesStatisticsFromAllPorts(builder.build()),
373             null, nodeRef, result);
374         return result;
375     }
376
377     @Override
378     public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
379         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
380         SettableFuture<TransactionId> result = SettableFuture.create();
381         GetAllMeterConfigStatisticsInputBuilder builder =
382             new GetAllMeterConfigStatisticsInputBuilder();
383         builder.setNode(nodeRef);
384         registrationRpcFutureCallBack(
385             meterStatsService.getAllMeterConfigStatistics(builder.build()),
386             null, nodeRef, result);
387         return result;
388     }
389
390     @Override
391     public void getGroupFeaturesStat(final NodeRef nodeRef) {
392         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
393         GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder().
394             setNode(nodeRef);
395         registrationRpcFutureCallBack(
396             groupStatsService.getGroupFeatures(input.build()), null, nodeRef,
397             null);
398     }
399
400     @Override
401     public void getMeterFeaturesStat(final NodeRef nodeRef) {
402         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
403         GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder().
404             setNode(nodeRef);
405         registrationRpcFutureCallBack(
406             meterStatsService.getMeterFeatures(input.build()), null, nodeRef,
407             null);
408     }
409
410     @Override
411     public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
412         Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
413         SettableFuture<TransactionId> result = SettableFuture.create();
414         GetGroupDescriptionInputBuilder builder =
415             new GetGroupDescriptionInputBuilder();
416         builder.setNode(nodeRef);
417         registrationRpcFutureCallBack(
418             groupStatsService.getGroupDescription(builder.build()), null,
419             nodeRef, result);
420         return result;
421     }
422
423     public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
424
425         private final TransactionId id;
426         private final NodeId nId;
427         private final List<T> notifications;
428         private final Optional<? extends DataObject> confInput;
429
430         public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
431             this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
432             notifications = new CopyOnWriteArrayList<T>();
433             confInput = Optional.fromNullable(input);
434             nId = nodeId;
435         }
436
437         @Override
438         public void addNotif(final T notif) {
439             notifications.add(notif);
440         }
441
442         @Override
443         public TransactionId getId() {
444             return id;
445         }
446
447         @Override
448         public NodeId getNodeId() {
449             return nId;
450         }
451
452         @Override
453         public List<T> getNotifications() {
454             return notifications;
455         }
456
457         @Override
458         public Optional<? extends DataObject> getConfInput() {
459             return confInput;
460         }
461     }
462 }