Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingDeque;
18 import java.util.concurrent.ThreadFactory;
19
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
26 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
27 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
28 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
29 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
30 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
31 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
32 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
33 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import com.google.common.base.Preconditions;
51 import com.google.common.util.concurrent.ThreadFactoryBuilder;
52
53 /**
54 * statistics-manager
55 * org.opendaylight.controller.md.statistics.manager.impl
56 *
57 * StatisticsManagerImpl
58 * It represent a central point for whole module. Implementation
59 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
60 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
61 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
62 * In next, StatisticsManager provides all DS contact Transaction services.
63 *
64 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
65 *
66 */
67 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
68
69    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
70
71    private static final int QUEUE_DEPTH = 5000;
72    private static final int MAX_BATCH = 100;
73
74    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
75
76    private final DataBroker dataBroker;
77    private final ExecutorService statRpcMsgManagerExecutor;
78    private final ExecutorService statDataStoreOperationServ;
79    private StatRpcMsgManager rpcMsgManager;
80    private List<StatPermCollector> statCollectors;
81    private final Object statCollectorLock = new Object();
82    private BindingTransactionChain txChain;
83    private volatile boolean finishing = false;
84
85    private StatNodeRegistration nodeRegistrator;
86    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
87    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
88    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
89    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
90    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
91    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
92
93    private final StatisticsManagerConfig statManagerConfig;
94
95    public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
96        statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
97        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
98        ThreadFactory threadFact;
99        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
100        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
101        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
102        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
103        txChain =  dataBroker.createTransactionChain(this);
104    }
105
106    @Override
107    public void start(final NotificationProviderService notifService,
108            final RpcConsumerRegistry rpcRegistry) {
109        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
110        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
111        statCollectors = Collections.emptyList();
112        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
113        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
114        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
115        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
116        tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
117        portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
118        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
119
120        statRpcMsgManagerExecutor.execute(rpcMsgManager);
121        statDataStoreOperationServ.execute(this);
122        LOG.info("Statistics Manager started successfully!");
123    }
124
125    @Override
126    public void close() throws Exception {
127        LOG.info("StatisticsManager close called");
128        finishing = true;
129        if (nodeRegistrator != null) {
130            nodeRegistrator.close();
131            nodeRegistrator = null;
132        }
133        if (flowListeningCommiter != null) {
134            flowListeningCommiter.close();
135            flowListeningCommiter = null;
136        }
137        if (meterListeningCommiter != null) {
138            meterListeningCommiter.close();
139            meterListeningCommiter = null;
140        }
141        if (groupListeningCommiter != null) {
142            groupListeningCommiter.close();
143            groupListeningCommiter = null;
144        }
145        if (tableNotifCommiter != null) {
146            tableNotifCommiter.close();
147            tableNotifCommiter = null;
148        }
149        if (portNotifyCommiter != null) {
150            portNotifyCommiter.close();
151            portNotifyCommiter = null;
152        }
153        if (queueNotifyCommiter != null) {
154            queueNotifyCommiter.close();
155            queueNotifyCommiter = null;
156        }
157        if (statCollectors != null) {
158            for (StatPermCollector collector : statCollectors) {
159                collector.close();
160                collector = null;
161            }
162            statCollectors = null;
163        }
164        if (rpcMsgManager != null) {
165            rpcMsgManager.close();
166            rpcMsgManager = null;
167        }
168        statRpcMsgManagerExecutor.shutdown();
169        statDataStoreOperationServ.shutdown();
170        if (txChain != null) {
171            txChain.close();
172            txChain = null;
173        }
174    }
175
176    @Override
177    public void enqueue(final StatDataStoreOperation op) {
178        // we don't need to block anything - next statistics come soon
179        final boolean success = dataStoreOperQueue.offer(op);
180        if ( ! success) {
181            LOG.debug("Stat DS/Operational submiter Queue is full!");
182        }
183    }
184
185    @Override
186    public void run() {
187        /* Neverending cyle - wait for finishing */
188        while ( ! finishing) {
189            try {
190                StatDataStoreOperation op = dataStoreOperQueue.take();
191                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
192                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
193
194                int ops = 0;
195                do {
196                    op.applyOperation(tx);
197
198                    ops++;
199                    if (ops < MAX_BATCH) {
200                        op = dataStoreOperQueue.poll();
201                    } else {
202                        op = null;
203                    }
204                } while (op != null);
205
206                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
207
208                    tx.submit().checkedGet();
209            } catch (final InterruptedException e) {
210                LOG.warn("Stat Manager DS Operation thread interupted!", e);
211                finishing = true;
212            } catch (final Exception e) {
213                LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
214                txChain.close();
215                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
216                cleanDataStoreOperQueue();
217            }
218        }
219        // Drain all events, making sure any blocked threads are unblocked
220        cleanDataStoreOperQueue();
221    }
222
223    private synchronized void cleanDataStoreOperQueue() {
224        // Drain all events, making sure any blocked threads are unblocked
225        while (! dataStoreOperQueue.isEmpty()) {
226            dataStoreOperQueue.poll();
227        }
228    }
229
230    @Override
231    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
232            final Throwable cause) {
233        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
234    }
235
236    @Override
237    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
238        // NOOP
239    }
240
241    @Override
242    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
243        for (final StatPermCollector collector : statCollectors) {
244            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
245                return true;
246            }
247        }
248        return false;
249    }
250
251    @Override
252    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
253        for (final StatPermCollector collector : statCollectors) {
254            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
255                collector.collectNextStatistics(xid);
256            }
257        }
258    }
259
260    @Override
261    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
262            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
263        for (final StatPermCollector collector : statCollectors) {
264            if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
265                return;
266            }
267        }
268        synchronized (statCollectorLock) {
269            for (final StatPermCollector collector : statCollectors) {
270                if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
271                    return;
272                }
273            }
274            final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
275                    statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
276                    statManagerConfig.getMaxNodesForCollector());
277            final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
278            newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
279            statCollectorsNew.add(newCollector);
280            statCollectors = Collections.unmodifiableList(statCollectorsNew);
281        }
282    }
283
284    @Override
285    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
286        flowListeningCommiter.cleanForDisconnect(nodeIdent);
287
288        for (final StatPermCollector collector : statCollectors) {
289            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
290                if ( ! collector.hasActiveNodes()) {
291                    synchronized (statCollectorLock) {
292                        if (collector.hasActiveNodes()) {
293                            return;
294                        }
295                        final List<StatPermCollector> newStatColl =
296                                new ArrayList<>(statCollectors);
297                        newStatColl.remove(collector);
298                        statCollectors = Collections.unmodifiableList(newStatColl);
299                    }
300                }
301                return;
302            }
303        }
304        LOG.debug("Node {} has not been removed.", nodeIdent);
305    }
306
307    @Override
308    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
309            final StatCapabTypes statCapab) {
310        for (final StatPermCollector collector : statCollectors) {
311            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
312                return;
313            }
314        }
315        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
316    }
317
318    /* Getter internal Statistic Manager Job Classes */
319    @Override
320    public StatRpcMsgManager getRpcMsgManager() {
321        return rpcMsgManager;
322    }
323
324    @Override
325    public StatNodeRegistration getNodeRegistrator() {
326        return nodeRegistrator;
327    }
328
329    @Override
330    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
331        return flowListeningCommiter;
332    }
333
334    @Override
335    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
336        return meterListeningCommiter;
337    }
338
339    @Override
340    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
341        return groupListeningCommiter;
342    }
343
344    @Override
345    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
346        return queueNotifyCommiter;
347    }
348
349
350    @Override
351    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
352        return tableNotifCommiter;
353    }
354
355    @Override
356    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
357        return portNotifyCommiter;
358    }
359
360     @Override
361     public StatisticsManagerConfig getConfiguration() {
362         return statManagerConfig;
363     }
364 }
365