Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.UUID;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingDeque;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
32 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
33 import org.opendaylight.openflowplugin.applications.statistics.manager.StatListeningCommiter;
34 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
35 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
36 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
37 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
38 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
39 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57 * statistics-manager
58 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
59 *
60 * StatisticsManagerImpl
61 * It represent a central point for whole module. Implementation
62 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
63 * Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
64 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
65 * In next, StatisticsManager provides all DS contact Transaction services.
66 *
67 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
68 *
69 */
70 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
71
72    private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
73
74    private static final int QUEUE_DEPTH = 5000;
75    private static final int MAX_BATCH = 100;
76
77    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
78    private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
79    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
80
81
82     private final DataBroker dataBroker;
83    private final ExecutorService statDataStoreOperationServ;
84    private EntityOwnershipService ownershipService;
85    private StatRpcMsgManager rpcMsgManager;
86    private List<StatPermCollector> statCollectors;
87    private final Object statCollectorLock = new Object();
88    private BindingTransactionChain txChain;
89    private volatile boolean finishing = false;
90
91    private StatNodeRegistration nodeRegistrator;
92    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
93    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
94    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
95    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
96    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
97    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
98
99    private final StatisticsManagerConfig statManagerConfig;
100
101    public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
102        statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
103        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
104        ThreadFactory threadFact;
105        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
106        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
107        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
108        txChain =  dataBroker.createTransactionChain(this);
109    }
110
111    @Override
112    public void start(final NotificationProviderService notifService,
113            final RpcConsumerRegistry rpcRegistry) {
114        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
115        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
116        statCollectors = Collections.emptyList();
117        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
118        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
119        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
120        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
121        tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
122        portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
123        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
124
125        statDataStoreOperationServ.execute(this);
126        LOG.info("Statistics Manager started successfully!");
127    }
128
129    private <T extends AutoCloseable> T close(final T closeable) throws Exception {
130        if (closeable != null) {
131            closeable.close();
132        }
133        return null;
134    }
135
136    @Override
137    public void close() throws Exception {
138        LOG.info("StatisticsManager close called");
139        finishing = true;
140        nodeRegistrator = close(nodeRegistrator);
141        flowListeningCommiter = close(flowListeningCommiter);
142        meterListeningCommiter = close(meterListeningCommiter);
143        groupListeningCommiter = close(groupListeningCommiter);
144        tableNotifCommiter = close(tableNotifCommiter);
145        portNotifyCommiter = close(portNotifyCommiter);
146        queueNotifyCommiter = close(queueNotifyCommiter);
147        if (statCollectors != null) {
148            for (StatPermCollector collector : statCollectors) {
149                collector = close(collector);
150            }
151            statCollectors = null;
152        }
153        rpcMsgManager = null;
154        statDataStoreOperationServ.shutdown();
155        txChain = close(txChain);
156    }
157
158    @Override
159    public void enqueue(final StatDataStoreOperation op) {
160        // we don't need to block anything - next statistics come soon
161        final boolean success = dataStoreOperQueue.offer(op);
162        if ( ! success) {
163            LOG.debug("Stat DS/Operational submitter Queue is full!");
164        }
165    }
166
167    @Override
168    public void run() {
169        /* Neverending cyle - wait for finishing */
170        while ( ! finishing) {
171            StatDataStoreOperation op = null;
172            try {
173                op = dataStoreOperQueue.take();
174                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
175                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
176
177                int ops = 0;
178                do {
179                    Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
180                    if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
181                        // dont apply operations for nodes which have been disconnected or if there uuids do not match
182                        // this can happen if operations are queued and node is removed.
183                        // if the uuids dont match, it means that the stat operation are stale and belong to the same node
184                        // which got disconnected and connected again.
185                        op.applyOperation(tx);
186                        ops++;
187                    } else {
188                        LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
189                    }
190
191                    if (ops < MAX_BATCH) {
192                        op = dataStoreOperQueue.poll();
193                    } else {
194                        op = null;
195                    }
196                } while (op != null);
197
198                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
199
200                tx.submit().checkedGet();
201            } catch (final InterruptedException e) {
202                LOG.warn("Stat Manager DS Operation thread interrupted, while " +
203                        "waiting for StatDataStore Operation task!", e);
204                finishing = true;
205            } catch (final Exception e) {
206                LOG.warn("Unhandled exception during processing statistics for {}. " +
207                        "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
208                txChain.close();
209                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
210                cleanDataStoreOperQueue();
211            }
212        }
213        // Drain all events, making sure any blocked threads are unblocked
214        cleanDataStoreOperQueue();
215    }
216
217    private synchronized void cleanDataStoreOperQueue() {
218        // Drain all events, making sure any blocked threads are unblocked
219        while (! dataStoreOperQueue.isEmpty()) {
220            dataStoreOperQueue.poll();
221        }
222    }
223
224    @Override
225    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
226            final Throwable cause) {
227        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
228    }
229
230    @Override
231    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
232        // NOOP
233    }
234
235    @Override
236    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
237        for (final StatPermCollector collector : statCollectors) {
238            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
239                return true;
240            }
241        }
242        return false;
243    }
244
245    @Override
246    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
247        for (final StatPermCollector collector : statCollectors) {
248            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
249                collector.collectNextStatistics(xid);
250            }
251        }
252    }
253
254     @Override
255     public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
256             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
257
258
259         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
260         if (collectorUUIDPair == null) {
261             // no collector contains this node,
262             // check if one of the collectors can accommodate it
263             // if no then add a new collector
264
265             synchronized(statCollectorLock) {
266                 for (int i = statCollectors.size() - 1; i >= 0; i--) {
267                     // start from back of the list as most likely previous ones might be full
268                     final StatPermCollector aCollector = statCollectors.get(i);
269                     if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
270                         // if the collector returns true after adding node, then return
271                         nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
272                         LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
273                                 numNodesBeingCollected.incrementAndGet());
274                         return;
275                     }
276                 }
277                 // no collector was able to add this node
278                 LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
279                 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
280                         statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
281                         statManagerConfig.getMaxNodesForCollector());
282
283                 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
284                 statCollectorsNew.add(newCollector);
285                 statCollectors = Collections.unmodifiableList(statCollectorsNew);
286                 nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
287                 LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
288
289                 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
290             }
291
292
293         } else {
294             // add to the collector, even if it rejects it.
295             collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
296         }
297     }
298
299
300     @Override
301     public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
302         flowListeningCommiter.cleanForDisconnect(nodeIdent);
303
304         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
305         if (collectorUUIDPair != null) {
306             StatPermCollector collector = collectorUUIDPair.getLeft();
307             if (collector != null) {
308                 nodeCollectorMap.remove(nodeIdent);
309                 LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
310
311                 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
312                     if (!collector.hasActiveNodes()) {
313                         synchronized (statCollectorLock) {
314                             if (collector.hasActiveNodes()) {
315                                 return;
316                             }
317                             final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
318                             newStatColl.remove(collector);
319                             statCollectors = Collections.unmodifiableList(newStatColl);
320                         }
321                     }
322                     LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
323                 } else {
324                     LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
325                 }
326             } else {
327                 LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
328             }
329
330         } else {
331             LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
332         }
333     }
334
335    @Override
336    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
337            final StatCapabTypes statCapab) {
338        for (final StatPermCollector collector : statCollectors) {
339            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
340                return;
341            }
342        }
343        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
344    }
345
346     @Override
347     public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
348                                               final StatCapabTypes statCapab) {
349         for (final StatPermCollector collector : statCollectors) {
350             if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
351                 return;
352             }
353         }
354         LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
355     }
356
357    /* Getter internal Statistic Manager Job Classes */
358    @Override
359    public StatRpcMsgManager getRpcMsgManager() {
360        return rpcMsgManager;
361    }
362
363    @Override
364    public StatNodeRegistration getNodeRegistrator() {
365        return nodeRegistrator;
366    }
367
368    @Override
369    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
370        return flowListeningCommiter;
371    }
372
373    @Override
374    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
375        return meterListeningCommiter;
376    }
377
378    @Override
379    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
380        return groupListeningCommiter;
381    }
382
383    @Override
384    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
385        return queueNotifyCommiter;
386    }
387
388
389    @Override
390    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
391        return tableNotifCommiter;
392    }
393
394    @Override
395    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
396        return portNotifyCommiter;
397    }
398
399     @Override
400     public StatisticsManagerConfig getConfiguration() {
401         return statManagerConfig;
402     }
403
404     @Override
405     public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
406         Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
407         if (permCollectorUUIDPair != null) {
408             return permCollectorUUIDPair.getRight();
409         }
410         // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
411         return UUID.fromString("invalid-uuid");
412     }
413
414     @Override
415     public void setOwnershipService(EntityOwnershipService ownershipService) {
416         this.ownershipService = ownershipService;
417     }
418
419     @Override
420     public EntityOwnershipService getOwnershipService() {
421         return this.ownershipService;
422     }
423
424 }
425