Merge "Fail on validation of checkstyle set to true"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingDeque;
18 import java.util.concurrent.ThreadFactory;
19
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
26 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
27 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
28 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
29 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
30 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
31 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
32 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
33 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
34 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.base.Preconditions;
52 import com.google.common.util.concurrent.ThreadFactoryBuilder;
53
54 /**
55 * statistics-manager
56 * org.opendaylight.controller.md.statistics.manager.impl
57 *
58 * StatisticsManagerImpl
59 * It represent a central point for whole module. Implementation
60 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
61 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
62 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
63 * In next, StatisticsManager provides all DS contact Transaction services.
64 *
65 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
66 *
67 */
68 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
69
70    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
71
72    private static final int QUEUE_DEPTH = 5000;
73    private static final int MAX_BATCH = 100;
74
75    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
76
77    private final DataBroker dataBroker;
78    private final ExecutorService statRpcMsgManagerExecutor;
79    private final ExecutorService statDataStoreOperationServ;
80    private StatRpcMsgManager rpcMsgManager;
81    private List<StatPermCollector> statCollectors;
82    private final Object statCollectorLock = new Object();
83    private BindingTransactionChain txChain;
84    private volatile boolean finishing = false;
85
86    private StatNodeRegistration nodeRegistrator;
87    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
88    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
89    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
90    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
91    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
92    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
93
94    private final StatisticsManagerConfig statManagerConfig;
95
96    public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
97        statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
98        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
99        ThreadFactory threadFact;
100        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
101        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
102        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
103        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
104        txChain =  dataBroker.createTransactionChain(this);
105    }
106
107    @Override
108    public void start(final NotificationProviderService notifService,
109            final RpcConsumerRegistry rpcRegistry) {
110        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
111        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
112        statCollectors = Collections.emptyList();
113        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
114        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
115        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
116        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
117        tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
118        portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
119        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
120
121        statRpcMsgManagerExecutor.execute(rpcMsgManager);
122        statDataStoreOperationServ.execute(this);
123        LOG.info("Statistics Manager started successfully!");
124    }
125
126    @Override
127    public void close() throws Exception {
128        LOG.info("StatisticsManager close called");
129        finishing = true;
130        if (nodeRegistrator != null) {
131            nodeRegistrator.close();
132            nodeRegistrator = null;
133        }
134        if (flowListeningCommiter != null) {
135            flowListeningCommiter.close();
136            flowListeningCommiter = null;
137        }
138        if (meterListeningCommiter != null) {
139            meterListeningCommiter.close();
140            meterListeningCommiter = null;
141        }
142        if (groupListeningCommiter != null) {
143            groupListeningCommiter.close();
144            groupListeningCommiter = null;
145        }
146        if (tableNotifCommiter != null) {
147            tableNotifCommiter.close();
148            tableNotifCommiter = null;
149        }
150        if (portNotifyCommiter != null) {
151            portNotifyCommiter.close();
152            portNotifyCommiter = null;
153        }
154        if (queueNotifyCommiter != null) {
155            queueNotifyCommiter.close();
156            queueNotifyCommiter = null;
157        }
158        if (statCollectors != null) {
159            for (StatPermCollector collector : statCollectors) {
160                collector.close();
161                collector = null;
162            }
163            statCollectors = null;
164        }
165        if (rpcMsgManager != null) {
166            rpcMsgManager.close();
167            rpcMsgManager = null;
168        }
169        statRpcMsgManagerExecutor.shutdown();
170        statDataStoreOperationServ.shutdown();
171        if (txChain != null) {
172            txChain.close();
173            txChain = null;
174        }
175    }
176
177    @Override
178    public void enqueue(final StatDataStoreOperation op) {
179        // we don't need to block anything - next statistics come soon
180        final boolean success = dataStoreOperQueue.offer(op);
181        if ( ! success) {
182            LOG.debug("Stat DS/Operational submiter Queue is full!");
183        }
184    }
185
186    @Override
187    public void run() {
188        /* Neverending cyle - wait for finishing */
189        while ( ! finishing) {
190            try {
191                StatDataStoreOperation op = dataStoreOperQueue.take();
192                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
193                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
194
195                int ops = 0;
196                do {
197                    op.applyOperation(tx);
198
199                    ops++;
200                    if (ops < MAX_BATCH) {
201                        op = dataStoreOperQueue.poll();
202                    } else {
203                        op = null;
204                    }
205                } while (op != null);
206
207                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
208
209                    tx.submit().checkedGet();
210            } catch (final InterruptedException e) {
211                LOG.warn("Stat Manager DS Operation thread interupted!", e);
212                finishing = true;
213            } catch (final Exception e) {
214                LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
215                txChain.close();
216                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
217                cleanDataStoreOperQueue();
218            }
219        }
220        // Drain all events, making sure any blocked threads are unblocked
221        cleanDataStoreOperQueue();
222    }
223
224    private synchronized void cleanDataStoreOperQueue() {
225        // Drain all events, making sure any blocked threads are unblocked
226        while (! dataStoreOperQueue.isEmpty()) {
227            StatDataStoreOperation op = dataStoreOperQueue.poll();
228
229            // Execute the node removal clean up operation if queued in the
230            // operational queue.
231            if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
232                try {
233                    LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
234                    op.applyOperation(null);
235                } catch (final Exception ex) {
236                    LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
237                }
238            }
239        }
240    }
241
242    @Override
243    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
244            final Throwable cause) {
245        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
246    }
247
248    @Override
249    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
250        // NOOP
251    }
252
253    @Override
254    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
255        for (final StatPermCollector collector : statCollectors) {
256            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
257                return true;
258            }
259        }
260        return false;
261    }
262
263    @Override
264    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
265        for (final StatPermCollector collector : statCollectors) {
266            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
267                collector.collectNextStatistics(xid);
268            }
269        }
270    }
271
272    @Override
273    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
274            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
275        for (final StatPermCollector collector : statCollectors) {
276            if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
277                return;
278            }
279        }
280        synchronized (statCollectorLock) {
281            for (final StatPermCollector collector : statCollectors) {
282                if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
283                    return;
284                }
285            }
286            final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
287                    statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
288                    statManagerConfig.getMaxNodesForCollector());
289            final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
290            newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
291            statCollectorsNew.add(newCollector);
292            statCollectors = Collections.unmodifiableList(statCollectorsNew);
293        }
294    }
295
296    @Override
297    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
298        flowListeningCommiter.cleanForDisconnect(nodeIdent);
299
300        for (final StatPermCollector collector : statCollectors) {
301            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
302                if ( ! collector.hasActiveNodes()) {
303                    synchronized (statCollectorLock) {
304                        if (collector.hasActiveNodes()) {
305                            return;
306                        }
307                        final List<StatPermCollector> newStatColl =
308                                new ArrayList<>(statCollectors);
309                        newStatColl.remove(collector);
310                        statCollectors = Collections.unmodifiableList(newStatColl);
311                    }
312                }
313                return;
314            }
315        }
316        LOG.debug("Node {} has not been removed.", nodeIdent);
317    }
318
319    @Override
320    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
321            final StatCapabTypes statCapab) {
322        for (final StatPermCollector collector : statCollectors) {
323            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
324                return;
325            }
326        }
327        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
328    }
329
330    /* Getter internal Statistic Manager Job Classes */
331    @Override
332    public StatRpcMsgManager getRpcMsgManager() {
333        return rpcMsgManager;
334    }
335
336    @Override
337    public StatNodeRegistration getNodeRegistrator() {
338        return nodeRegistrator;
339    }
340
341    @Override
342    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
343        return flowListeningCommiter;
344    }
345
346    @Override
347    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
348        return meterListeningCommiter;
349    }
350
351    @Override
352    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
353        return groupListeningCommiter;
354    }
355
356    @Override
357    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
358        return queueNotifyCommiter;
359    }
360
361
362    @Override
363    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
364        return tableNotifCommiter;
365    }
366
367    @Override
368    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
369        return portNotifyCommiter;
370    }
371
372     @Override
373     public StatisticsManagerConfig getConfiguration() {
374         return statManagerConfig;
375     }
376 }
377