Merge "Fix allowable Unix ports range 1024 - 65535"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
19 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
20 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
21 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
22 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
23 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
24 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
25 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
26 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.BlockingQueue;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.LinkedBlockingDeque;
49 import java.util.concurrent.ThreadFactory;
50
51 /**
52 * statistics-manager
53 * org.opendaylight.controller.md.statistics.manager.impl
54 *
55 * StatisticsManagerImpl
56 * It represent a central point for whole module. Implementation
57 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
58 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
59 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
60 * In next, StatisticsManager provides all DS contact Transaction services.
61 *
62 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
63 *
64 */
65 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
66
67    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
68
69    private static final int QUEUE_DEPTH = 5000;
70    private static final int MAX_BATCH = 100;
71
72    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
73
74    private final DataBroker dataBroker;
75    private final ExecutorService statRpcMsgManagerExecutor;
76    private final ExecutorService statDataStoreOperationServ;
77    private StatRpcMsgManager rpcMsgManager;
78    private List<StatPermCollector> statCollectors;
79    private final Object statCollectorLock = new Object();
80    private BindingTransactionChain txChain;
81    private volatile boolean finishing = false;
82
83    private StatNodeRegistration nodeRegistrator;
84    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
85    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
86    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
87    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
88    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
89    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
90
91    private final StatisticsManagerConfig statManagerConfig;
92
93    public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
94        this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
95        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
96        ThreadFactory threadFact;
97        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
98        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
99        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
100        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
101        txChain =  dataBroker.createTransactionChain(this);
102    }
103
104    @Override
105    public void start(final NotificationProviderService notifService,
106            final RpcConsumerRegistry rpcRegistry) {
107        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
108        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
109        statCollectors = Collections.emptyList();
110        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
111        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
112        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
113        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
114        tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
115        portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
116        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
117
118        statRpcMsgManagerExecutor.execute(rpcMsgManager);
119        statDataStoreOperationServ.execute(this);
120        LOG.info("Statistics Manager started successfully!");
121    }
122
123    @Override
124    public void close() throws Exception {
125        LOG.info("StatisticsManager close called");
126        finishing = true;
127        if (nodeRegistrator != null) {
128            nodeRegistrator.close();
129            nodeRegistrator = null;
130        }
131        if (flowListeningCommiter != null) {
132            flowListeningCommiter.close();
133            flowListeningCommiter = null;
134        }
135        if (meterListeningCommiter != null) {
136            meterListeningCommiter.close();
137            meterListeningCommiter = null;
138        }
139        if (groupListeningCommiter != null) {
140            groupListeningCommiter.close();
141            groupListeningCommiter = null;
142        }
143        if (tableNotifCommiter != null) {
144            tableNotifCommiter.close();
145            tableNotifCommiter = null;
146        }
147        if (portNotifyCommiter != null) {
148            portNotifyCommiter.close();
149            portNotifyCommiter = null;
150        }
151        if (queueNotifyCommiter != null) {
152            queueNotifyCommiter.close();
153            queueNotifyCommiter = null;
154        }
155        if (statCollectors != null) {
156            for (StatPermCollector collector : statCollectors) {
157                collector.close();
158                collector = null;
159            }
160            statCollectors = null;
161        }
162        if (rpcMsgManager != null) {
163            rpcMsgManager.close();
164            rpcMsgManager = null;
165        }
166        statRpcMsgManagerExecutor.shutdown();
167        statDataStoreOperationServ.shutdown();
168        if (txChain != null) {
169            txChain.close();
170            txChain = null;
171        }
172    }
173
174    @Override
175    public void enqueue(final StatDataStoreOperation op) {
176        // we don't need to block anything - next statistics come soon
177        final boolean success = dataStoreOperQueue.offer(op);
178        if ( ! success) {
179            LOG.debug("Stat DS/Operational submiter Queue is full!");
180        }
181    }
182
183    @Override
184    public void run() {
185        /* Neverending cyle - wait for finishing */
186        while ( ! finishing) {
187            try {
188                StatDataStoreOperation op = dataStoreOperQueue.take();
189                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
190                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
191
192                int ops = 0;
193                do {
194                    op.applyOperation(tx);
195
196                    ops++;
197                    if (ops < MAX_BATCH) {
198                        op = dataStoreOperQueue.poll();
199                    } else {
200                        op = null;
201                    }
202                } while (op != null);
203
204                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
205
206                    tx.submit().checkedGet();
207            } catch (final InterruptedException e) {
208                LOG.warn("Stat Manager DS Operation thread interupted!", e);
209                finishing = true;
210            } catch (final Exception e) {
211                LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
212                txChain.close();
213                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
214                cleanDataStoreOperQueue();
215            }
216        }
217        // Drain all events, making sure any blocked threads are unblocked
218        cleanDataStoreOperQueue();
219    }
220
221    private synchronized void cleanDataStoreOperQueue() {
222        // Drain all events, making sure any blocked threads are unblocked
223        while (! dataStoreOperQueue.isEmpty()) {
224            dataStoreOperQueue.poll();
225        }
226    }
227
228    @Override
229    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
230            final Throwable cause) {
231        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
232    }
233
234    @Override
235    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
236        // NOOP
237    }
238
239    @Override
240    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
241        for (final StatPermCollector collector : statCollectors) {
242            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
243                return true;
244            }
245        }
246        return false;
247    }
248
249    @Override
250    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
251        for (final StatPermCollector collector : statCollectors) {
252            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
253                collector.collectNextStatistics();
254            }
255        }
256    }
257
258    @Override
259    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
260            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
261        for (final StatPermCollector collector : statCollectors) {
262            if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
263                return;
264            }
265        }
266        synchronized (statCollectorLock) {
267            for (final StatPermCollector collector : statCollectors) {
268                if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
269                    return;
270                }
271            }
272            final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
273                    statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
274                    statManagerConfig.getMaxNodesForCollector());
275            final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
276            newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
277            statCollectorsNew.add(newCollector);
278            statCollectors = Collections.unmodifiableList(statCollectorsNew);
279        }
280    }
281
282    @Override
283    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
284        flowListeningCommiter.cleanForDisconnect(nodeIdent);
285
286        for (final StatPermCollector collector : statCollectors) {
287            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
288                if ( ! collector.hasActiveNodes()) {
289                    synchronized (statCollectorLock) {
290                        if (collector.hasActiveNodes()) {
291                            return;
292                        }
293                        final List<StatPermCollector> newStatColl =
294                                new ArrayList<>(statCollectors);
295                        newStatColl.remove(collector);
296                        statCollectors = Collections.unmodifiableList(newStatColl);
297                    }
298                }
299                return;
300            }
301        }
302        LOG.debug("Node {} has not been removed.", nodeIdent);
303    }
304
305    @Override
306    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
307            final StatCapabTypes statCapab) {
308        for (final StatPermCollector collector : statCollectors) {
309            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
310                return;
311            }
312        }
313        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
314    }
315
316    /* Getter internal Statistic Manager Job Classes */
317    @Override
318    public StatRpcMsgManager getRpcMsgManager() {
319        return rpcMsgManager;
320    }
321
322    @Override
323    public StatNodeRegistration getNodeRegistrator() {
324        return nodeRegistrator;
325    }
326
327    @Override
328    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
329        return flowListeningCommiter;
330    }
331
332    @Override
333    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
334        return meterListeningCommiter;
335    }
336
337    @Override
338    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
339        return groupListeningCommiter;
340    }
341
342    @Override
343    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
344        return queueNotifyCommiter;
345    }
346
347
348    @Override
349    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
350        return tableNotifCommiter;
351    }
352
353    @Override
354    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
355        return portNotifyCommiter;
356    }
357
358     @Override
359     public StatisticsManagerConfig getConfiguration() {
360         return statManagerConfig;
361     }
362 }
363