2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
19 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
20 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
21 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
22 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
23 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
24 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
25 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
26 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.BlockingQueue;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.LinkedBlockingDeque;
49 import java.util.concurrent.ThreadFactory;
53 * org.opendaylight.controller.md.statistics.manager.impl
55 * StatisticsManagerImpl
56 * It represent a central point for whole module. Implementation
57 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
58 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
59 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
60 * In next, StatisticsManager provides all DS contact Transaction services.
62 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
65 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
67 private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
69 private static final int QUEUE_DEPTH = 5000;
70 private static final int MAX_BATCH = 100;
72 private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
74 private final DataBroker dataBroker;
75 private final ExecutorService statRpcMsgManagerExecutor;
76 private final ExecutorService statDataStoreOperationServ;
77 private StatRpcMsgManager rpcMsgManager;
78 private List<StatPermCollector> statCollectors;
79 private final Object statCollectorLock = new Object();
80 private BindingTransactionChain txChain;
81 private volatile boolean finishing = false;
83 private StatNodeRegistration nodeRegistrator;
84 private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
85 private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
86 private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
87 private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
88 private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
89 private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
91 private final StatisticsManagerConfig statManagerConfig;
93 public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
94 this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
95 this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
96 ThreadFactory threadFact;
97 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
98 statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
99 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
100 statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
101 txChain = dataBroker.createTransactionChain(this);
105 public void start(final NotificationProviderService notifService,
106 final RpcConsumerRegistry rpcRegistry) {
107 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
108 rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
109 statCollectors = Collections.emptyList();
110 nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
111 flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
112 meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
113 groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
114 tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
115 portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
116 queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
118 statRpcMsgManagerExecutor.execute(rpcMsgManager);
119 statDataStoreOperationServ.execute(this);
120 LOG.info("Statistics Manager started successfully!");
124 public void close() throws Exception {
125 LOG.info("StatisticsManager close called");
127 if (nodeRegistrator != null) {
128 nodeRegistrator.close();
129 nodeRegistrator = null;
131 if (flowListeningCommiter != null) {
132 flowListeningCommiter.close();
133 flowListeningCommiter = null;
135 if (meterListeningCommiter != null) {
136 meterListeningCommiter.close();
137 meterListeningCommiter = null;
139 if (groupListeningCommiter != null) {
140 groupListeningCommiter.close();
141 groupListeningCommiter = null;
143 if (tableNotifCommiter != null) {
144 tableNotifCommiter.close();
145 tableNotifCommiter = null;
147 if (portNotifyCommiter != null) {
148 portNotifyCommiter.close();
149 portNotifyCommiter = null;
151 if (queueNotifyCommiter != null) {
152 queueNotifyCommiter.close();
153 queueNotifyCommiter = null;
155 if (statCollectors != null) {
156 for (StatPermCollector collector : statCollectors) {
160 statCollectors = null;
162 if (rpcMsgManager != null) {
163 rpcMsgManager.close();
164 rpcMsgManager = null;
166 statRpcMsgManagerExecutor.shutdown();
167 statDataStoreOperationServ.shutdown();
168 if (txChain != null) {
175 public void enqueue(final StatDataStoreOperation op) {
176 // we don't need to block anything - next statistics come soon
177 final boolean success = dataStoreOperQueue.offer(op);
179 LOG.debug("Stat DS/Operational submiter Queue is full!");
185 /* Neverending cyle - wait for finishing */
186 while ( ! finishing) {
188 StatDataStoreOperation op = dataStoreOperQueue.take();
189 final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
190 LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
194 op.applyOperation(tx);
197 if (ops < MAX_BATCH) {
198 op = dataStoreOperQueue.poll();
202 } while (op != null);
204 LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
206 tx.submit().checkedGet();
207 } catch (final InterruptedException e) {
208 LOG.warn("Stat Manager DS Operation thread interupted!", e);
210 } catch (final Exception e) {
211 LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
213 txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
214 cleanDataStoreOperQueue();
217 // Drain all events, making sure any blocked threads are unblocked
218 cleanDataStoreOperQueue();
221 private synchronized void cleanDataStoreOperQueue() {
222 // Drain all events, making sure any blocked threads are unblocked
223 while (! dataStoreOperQueue.isEmpty()) {
224 dataStoreOperQueue.poll();
229 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
230 final Throwable cause) {
231 LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
235 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
240 public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
241 for (final StatPermCollector collector : statCollectors) {
242 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
250 public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
251 for (final StatPermCollector collector : statCollectors) {
252 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
253 collector.collectNextStatistics();
259 public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
260 final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
261 for (final StatPermCollector collector : statCollectors) {
262 if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
266 synchronized (statCollectorLock) {
267 for (final StatPermCollector collector : statCollectors) {
268 if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
272 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
273 statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
274 statManagerConfig.getMaxNodesForCollector());
275 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
276 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
277 statCollectorsNew.add(newCollector);
278 statCollectors = Collections.unmodifiableList(statCollectorsNew);
283 public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
284 flowListeningCommiter.cleanForDisconnect(nodeIdent);
286 for (final StatPermCollector collector : statCollectors) {
287 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
288 if ( ! collector.hasActiveNodes()) {
289 synchronized (statCollectorLock) {
290 if (collector.hasActiveNodes()) {
293 final List<StatPermCollector> newStatColl =
294 new ArrayList<>(statCollectors);
295 newStatColl.remove(collector);
296 statCollectors = Collections.unmodifiableList(newStatColl);
302 LOG.debug("Node {} has not been removed.", nodeIdent);
306 public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
307 final StatCapabTypes statCapab) {
308 for (final StatPermCollector collector : statCollectors) {
309 if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
313 LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
316 /* Getter internal Statistic Manager Job Classes */
318 public StatRpcMsgManager getRpcMsgManager() {
319 return rpcMsgManager;
323 public StatNodeRegistration getNodeRegistrator() {
324 return nodeRegistrator;
328 public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
329 return flowListeningCommiter;
333 public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
334 return meterListeningCommiter;
338 public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
339 return groupListeningCommiter;
343 public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
344 return queueNotifyCommiter;
349 public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
350 return tableNotifCommiter;
354 public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
355 return portNotifyCommiter;
359 public StatisticsManagerConfig getConfiguration() {
360 return statManagerConfig;