2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingDeque;
18 import java.util.concurrent.ThreadFactory;
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
26 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
27 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
28 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
29 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
30 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
31 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
32 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
33 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
34 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import com.google.common.base.Preconditions;
52 import com.google.common.util.concurrent.ThreadFactoryBuilder;
56 * org.opendaylight.controller.md.statistics.manager.impl
58 * StatisticsManagerImpl
59 * It represent a central point for whole module. Implementation
60 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
61 * Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
62 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
63 * In next, StatisticsManager provides all DS contact Transaction services.
65 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
68 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
70 private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
72 private static final int QUEUE_DEPTH = 5000;
73 private static final int MAX_BATCH = 100;
75 private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
77 private final DataBroker dataBroker;
78 private final ExecutorService statRpcMsgManagerExecutor;
79 private final ExecutorService statDataStoreOperationServ;
80 private StatRpcMsgManager rpcMsgManager;
81 private List<StatPermCollector> statCollectors;
82 private final Object statCollectorLock = new Object();
83 private BindingTransactionChain txChain;
84 private volatile boolean finishing = false;
86 private StatNodeRegistration nodeRegistrator;
87 private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
88 private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
89 private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
90 private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
91 private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
92 private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
94 private final StatisticsManagerConfig statManagerConfig;
96 public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
97 statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
98 this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
99 ThreadFactory threadFact;
100 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
101 statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
102 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
103 statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
104 txChain = dataBroker.createTransactionChain(this);
108 public void start(final NotificationProviderService notifService,
109 final RpcConsumerRegistry rpcRegistry) {
110 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
111 rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
112 statCollectors = Collections.emptyList();
113 nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
114 flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
115 meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
116 groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
117 tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
118 portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
119 queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
121 statRpcMsgManagerExecutor.execute(rpcMsgManager);
122 statDataStoreOperationServ.execute(this);
123 LOG.info("Statistics Manager started successfully!");
127 public void close() throws Exception {
128 LOG.info("StatisticsManager close called");
130 if (nodeRegistrator != null) {
131 nodeRegistrator.close();
132 nodeRegistrator = null;
134 if (flowListeningCommiter != null) {
135 flowListeningCommiter.close();
136 flowListeningCommiter = null;
138 if (meterListeningCommiter != null) {
139 meterListeningCommiter.close();
140 meterListeningCommiter = null;
142 if (groupListeningCommiter != null) {
143 groupListeningCommiter.close();
144 groupListeningCommiter = null;
146 if (tableNotifCommiter != null) {
147 tableNotifCommiter.close();
148 tableNotifCommiter = null;
150 if (portNotifyCommiter != null) {
151 portNotifyCommiter.close();
152 portNotifyCommiter = null;
154 if (queueNotifyCommiter != null) {
155 queueNotifyCommiter.close();
156 queueNotifyCommiter = null;
158 if (statCollectors != null) {
159 for (StatPermCollector collector : statCollectors) {
163 statCollectors = null;
165 if (rpcMsgManager != null) {
166 rpcMsgManager.close();
167 rpcMsgManager = null;
169 statRpcMsgManagerExecutor.shutdown();
170 statDataStoreOperationServ.shutdown();
171 if (txChain != null) {
178 public void enqueue(final StatDataStoreOperation op) {
179 // we don't need to block anything - next statistics come soon
180 final boolean success = dataStoreOperQueue.offer(op);
182 LOG.debug("Stat DS/Operational submiter Queue is full!");
188 /* Neverending cyle - wait for finishing */
189 while ( ! finishing) {
191 StatDataStoreOperation op = dataStoreOperQueue.take();
192 final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
193 LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
197 op.applyOperation(tx);
200 if (ops < MAX_BATCH) {
201 op = dataStoreOperQueue.poll();
205 } while (op != null);
207 LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
209 tx.submit().checkedGet();
210 } catch (final InterruptedException e) {
211 LOG.warn("Stat Manager DS Operation thread interupted!", e);
213 } catch (final Exception e) {
214 LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
216 txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
217 cleanDataStoreOperQueue();
220 // Drain all events, making sure any blocked threads are unblocked
221 cleanDataStoreOperQueue();
224 private synchronized void cleanDataStoreOperQueue() {
225 // Drain all events, making sure any blocked threads are unblocked
226 while (! dataStoreOperQueue.isEmpty()) {
227 StatDataStoreOperation op = dataStoreOperQueue.poll();
229 // Execute the node removal clean up operation if queued in the
230 // operational queue.
231 if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
233 LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
234 op.applyOperation(null);
235 } catch (final Exception ex) {
236 LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
243 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
244 final Throwable cause) {
245 LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
249 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
254 public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
255 for (final StatPermCollector collector : statCollectors) {
256 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
264 public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
265 for (final StatPermCollector collector : statCollectors) {
266 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
267 collector.collectNextStatistics(xid);
273 public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
274 final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
275 for (final StatPermCollector collector : statCollectors) {
276 if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
280 synchronized (statCollectorLock) {
281 for (final StatPermCollector collector : statCollectors) {
282 if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
286 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
287 statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
288 statManagerConfig.getMaxNodesForCollector());
289 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
290 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
291 statCollectorsNew.add(newCollector);
292 statCollectors = Collections.unmodifiableList(statCollectorsNew);
297 public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
298 flowListeningCommiter.cleanForDisconnect(nodeIdent);
300 for (final StatPermCollector collector : statCollectors) {
301 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
302 if ( ! collector.hasActiveNodes()) {
303 synchronized (statCollectorLock) {
304 if (collector.hasActiveNodes()) {
307 final List<StatPermCollector> newStatColl =
308 new ArrayList<>(statCollectors);
309 newStatColl.remove(collector);
310 statCollectors = Collections.unmodifiableList(newStatColl);
316 LOG.debug("Node {} has not been removed.", nodeIdent);
320 public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
321 final StatCapabTypes statCapab) {
322 for (final StatPermCollector collector : statCollectors) {
323 if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
327 LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
330 /* Getter internal Statistic Manager Job Classes */
332 public StatRpcMsgManager getRpcMsgManager() {
333 return rpcMsgManager;
337 public StatNodeRegistration getNodeRegistrator() {
338 return nodeRegistrator;
342 public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
343 return flowListeningCommiter;
347 public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
348 return meterListeningCommiter;
352 public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
353 return groupListeningCommiter;
357 public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
358 return queueNotifyCommiter;
363 public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
364 return tableNotifCommiter;
368 public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
369 return portNotifyCommiter;
373 public StatisticsManagerConfig getConfiguration() {
374 return statManagerConfig;