Merge "BUG 1966 - change message logging level (info -> trace)"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingDeque;
18 import java.util.concurrent.ThreadFactory;
19
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
26 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
27 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
28 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
29 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
30 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
31 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
32 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
33 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.common.base.Preconditions;
50 import com.google.common.util.concurrent.ThreadFactoryBuilder;
51
52 /**
53 * statistics-manager
54 * org.opendaylight.controller.md.statistics.manager.impl
55 *
56 * StatisticsManagerImpl
57 * It represent a central point for whole module. Implementation
58 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
59 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
60 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
61 * In next, StatisticsManager provides all DS contact Transaction services.
62 *
63 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
64 *
65 */
66 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
67
68    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
69
70    private static final int QUEUE_DEPTH = 5000;
71    private static final int MAX_BATCH = 100;
72
73    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
74
75    private final DataBroker dataBroker;
76    private final int maxNodesForCollectors;
77    private long minReqNetMonitInt;
78    private final ExecutorService statRpcMsgManagerExecutor;
79    private final ExecutorService statDataStoreOperationServ;
80    private StatRpcMsgManager rpcMsgManager;
81    private List<StatPermCollector> statCollectors;
82    private final Object statCollectorLock = new Object();
83    private BindingTransactionChain txChain;
84    private volatile boolean finishing = false;
85
86    private StatNodeRegistration nodeRegistrator;
87    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
88    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
89    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
90    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
91    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
92    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
93
94    public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) {
95        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
96        ThreadFactory threadFact;
97        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
98        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
99        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
100        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
101        maxNodesForCollectors = maxNodesForCollector;
102        txChain =  dataBroker.createTransactionChain(this);
103    }
104
105    @Override
106    public void start(final NotificationProviderService notifService,
107            final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
108        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
109        this.minReqNetMonitInt = minReqNetMonitInt;
110        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt);
111        statCollectors = Collections.emptyList();
112        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
113        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
114        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
115        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
116        tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
117        portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
118        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
119
120        statRpcMsgManagerExecutor.execute(rpcMsgManager);
121        statDataStoreOperationServ.execute(this);
122        LOG.info("Statistics Manager started successfully!");
123    }
124
125    @Override
126    public void close() throws Exception {
127        finishing = true;
128        if (nodeRegistrator != null) {
129            nodeRegistrator.close();
130            nodeRegistrator = null;
131        }
132        if (flowListeningCommiter != null) {
133            flowListeningCommiter.close();
134            flowListeningCommiter = null;
135        }
136        if (meterListeningCommiter != null) {
137            meterListeningCommiter.close();
138            meterListeningCommiter = null;
139        }
140        if (groupListeningCommiter != null) {
141            groupListeningCommiter.close();
142            groupListeningCommiter = null;
143        }
144        if (tableNotifCommiter != null) {
145            tableNotifCommiter.close();
146            tableNotifCommiter = null;
147        }
148        if (portNotifyCommiter != null) {
149            portNotifyCommiter.close();
150            portNotifyCommiter = null;
151        }
152        if (queueNotifyCommiter != null) {
153            queueNotifyCommiter.close();
154            queueNotifyCommiter = null;
155        }
156        if (statCollectors != null) {
157            for (StatPermCollector collector : statCollectors) {
158                collector.close();
159                collector = null;
160            }
161            statCollectors = null;
162        }
163        if (rpcMsgManager != null) {
164            rpcMsgManager.close();
165            rpcMsgManager = null;
166        }
167        statRpcMsgManagerExecutor.shutdown();
168        statDataStoreOperationServ.shutdown();
169        if (txChain != null) {
170            txChain.close();
171            txChain = null;
172        }
173    }
174
175    @Override
176    public void enqueue(final StatDataStoreOperation op) {
177        // we don't need to block anything - next statistics come soon
178        final boolean success = dataStoreOperQueue.offer(op);
179        if ( ! success) {
180            LOG.debug("Stat DS/Operational submiter Queue is full!");
181        }
182    }
183
184    @Override
185    public void run() {
186        /* Neverending cyle - wait for finishing */
187        while ( ! finishing) {
188            try {
189                StatDataStoreOperation op = dataStoreOperQueue.take();
190                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
191                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
192
193                int ops = 0;
194                do {
195                    op.applyOperation(tx);
196
197                    ops++;
198                    if (ops < MAX_BATCH) {
199                        op = dataStoreOperQueue.poll();
200                    } else {
201                        op = null;
202                    }
203                } while (op != null);
204
205                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
206
207                    tx.submit().checkedGet();
208            } catch (final InterruptedException e) {
209                LOG.warn("Stat Manager DS Operation thread interupted!", e);
210                finishing = true;
211            } catch (final Exception e) {
212                LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
213                txChain.close();
214                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
215                cleanDataStoreOperQueue();
216            }
217        }
218        // Drain all events, making sure any blocked threads are unblocked
219        cleanDataStoreOperQueue();
220    }
221
222    private synchronized void cleanDataStoreOperQueue() {
223        // Drain all events, making sure any blocked threads are unblocked
224        while (! dataStoreOperQueue.isEmpty()) {
225            dataStoreOperQueue.poll();
226        }
227    }
228
229    @Override
230    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
231            final Throwable cause) {
232        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
233    }
234
235    @Override
236    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
237        // NOOP
238    }
239
240    @Override
241    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
242        for (final StatPermCollector collector : statCollectors) {
243            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
244                return true;
245            }
246        }
247        return false;
248    }
249
250    @Override
251    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
252        for (final StatPermCollector collector : statCollectors) {
253            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
254                collector.collectNextStatistics();
255            }
256        }
257    }
258
259    @Override
260    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
261            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
262        for (final StatPermCollector collector : statCollectors) {
263            if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
264                return;
265            }
266        }
267        synchronized (statCollectorLock) {
268            for (final StatPermCollector collector : statCollectors) {
269                if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
270                    return;
271                }
272            }
273            final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
274                    minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors);
275            final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
276            newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
277            statCollectorsNew.add(newCollector);
278            statCollectors = Collections.unmodifiableList(statCollectorsNew);
279        }
280    }
281
282    @Override
283    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
284        flowListeningCommiter.cleanForDisconnect(nodeIdent);
285
286        for (final StatPermCollector collector : statCollectors) {
287            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
288                if ( ! collector.hasActiveNodes()) {
289                    synchronized (statCollectorLock) {
290                        if (collector.hasActiveNodes()) {
291                            return;
292                        }
293                        final List<StatPermCollector> newStatColl =
294                                new ArrayList<>(statCollectors);
295                        newStatColl.remove(collector);
296                        statCollectors = Collections.unmodifiableList(newStatColl);
297                    }
298                }
299                return;
300            }
301        }
302        LOG.debug("Node {} has not removed.", nodeIdent);
303    }
304
305    /* Getter internal Statistic Manager Job Classes */
306    @Override
307    public StatRpcMsgManager getRpcMsgManager() {
308        return rpcMsgManager;
309    }
310
311    @Override
312    public StatNodeRegistration getNodeRegistrator() {
313        return nodeRegistrator;
314    }
315
316    @Override
317    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
318        return flowListeningCommiter;
319    }
320
321    @Override
322    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
323        return meterListeningCommiter;
324    }
325
326    @Override
327    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
328        return groupListeningCommiter;
329    }
330
331    @Override
332    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
333        return queueNotifyCommiter;
334    }
335
336
337    @Override
338    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
339        return tableNotifCommiter;
340    }
341
342    @Override
343    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
344        return portNotifyCommiter;
345    }
346 }
347