2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
17 import java.util.UUID;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingDeque;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
32 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
33 import org.opendaylight.openflowplugin.applications.statistics.manager.StatListeningCommiter;
34 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
35 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
36 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
37 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
38 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
39 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
58 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
60 * StatisticsManagerImpl
61 * It represent a central point for whole module. Implementation
62 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
63 * Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
64 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
65 * In next, StatisticsManager provides all DS contact Transaction services.
67 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
70 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
72 private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
74 private static final int QUEUE_DEPTH = 5000;
75 private static final int MAX_BATCH = 100;
77 private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
78 private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
79 private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
82 private final DataBroker dataBroker;
83 private final ExecutorService statRpcMsgManagerExecutor;
84 private final ExecutorService statDataStoreOperationServ;
85 private EntityOwnershipService ownershipService;
86 private StatRpcMsgManager rpcMsgManager;
87 private List<StatPermCollector> statCollectors;
88 private final Object statCollectorLock = new Object();
89 private BindingTransactionChain txChain;
90 private volatile boolean finishing = false;
92 private StatNodeRegistration nodeRegistrator;
93 private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
94 private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
95 private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
96 private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
97 private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
98 private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
100 private final StatisticsManagerConfig statManagerConfig;
102 public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
103 statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
104 this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
105 ThreadFactory threadFact;
106 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
107 statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
108 threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
109 statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
110 txChain = dataBroker.createTransactionChain(this);
114 public void start(final NotificationProviderService notifService,
115 final RpcConsumerRegistry rpcRegistry) {
116 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
117 rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
118 statCollectors = Collections.emptyList();
119 nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
120 flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
121 meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
122 groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
123 tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
124 portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
125 queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
127 statRpcMsgManagerExecutor.execute(rpcMsgManager);
128 statDataStoreOperationServ.execute(this);
129 LOG.info("Statistics Manager started successfully!");
132 private <T extends AutoCloseable> T close(final T closeable) throws Exception {
133 if (closeable != null) {
140 public void close() throws Exception {
141 LOG.info("StatisticsManager close called");
143 nodeRegistrator = close(nodeRegistrator);
144 flowListeningCommiter = close(flowListeningCommiter);
145 meterListeningCommiter = close(meterListeningCommiter);
146 groupListeningCommiter = close(groupListeningCommiter);
147 tableNotifCommiter = close(tableNotifCommiter);
148 portNotifyCommiter = close(portNotifyCommiter);
149 queueNotifyCommiter = close(queueNotifyCommiter);
150 if (statCollectors != null) {
151 for (StatPermCollector collector : statCollectors) {
152 collector = close(collector);
154 statCollectors = null;
156 rpcMsgManager = close(rpcMsgManager);
157 statRpcMsgManagerExecutor.shutdown();
158 statDataStoreOperationServ.shutdown();
159 txChain = close(txChain);
163 public void enqueue(final StatDataStoreOperation op) {
164 // we don't need to block anything - next statistics come soon
165 final boolean success = dataStoreOperQueue.offer(op);
167 LOG.debug("Stat DS/Operational submitter Queue is full!");
173 /* Neverending cyle - wait for finishing */
174 while ( ! finishing) {
175 StatDataStoreOperation op = null;
177 op = dataStoreOperQueue.take();
178 final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
179 LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
183 Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
184 if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
185 // dont apply operations for nodes which have been disconnected or if there uuids do not match
186 // this can happen if operations are queued and node is removed.
187 // if the uuids dont match, it means that the stat operation are stale and belong to the same node
188 // which got disconnected and connected again.
189 op.applyOperation(tx);
192 LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
195 if (ops < MAX_BATCH) {
196 op = dataStoreOperQueue.poll();
200 } while (op != null);
202 LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
204 tx.submit().checkedGet();
205 } catch (final InterruptedException e) {
206 LOG.warn("Stat Manager DS Operation thread interrupted, while " +
207 "waiting for StatDataStore Operation task!", e);
209 } catch (final Exception e) {
210 LOG.warn("Unhandled exception during processing statistics for {}. " +
211 "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
213 txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
214 cleanDataStoreOperQueue();
217 // Drain all events, making sure any blocked threads are unblocked
218 cleanDataStoreOperQueue();
221 private synchronized void cleanDataStoreOperQueue() {
222 // Drain all events, making sure any blocked threads are unblocked
223 while (! dataStoreOperQueue.isEmpty()) {
224 dataStoreOperQueue.poll();
229 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
230 final Throwable cause) {
231 LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
235 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
240 public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
241 for (final StatPermCollector collector : statCollectors) {
242 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
250 public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
251 for (final StatPermCollector collector : statCollectors) {
252 if (collector.isProvidedFlowNodeActive(nodeIdent)) {
253 collector.collectNextStatistics(xid);
259 public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
260 final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
263 Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
264 if (collectorUUIDPair == null) {
265 // no collector contains this node,
266 // check if one of the collectors can accommodate it
267 // if no then add a new collector
269 synchronized(statCollectorLock) {
270 for (int i = statCollectors.size() - 1; i >= 0; i--) {
271 // start from back of the list as most likely previous ones might be full
272 final StatPermCollector aCollector = statCollectors.get(i);
273 if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
274 // if the collector returns true after adding node, then return
275 nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
276 LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
277 numNodesBeingCollected.incrementAndGet());
281 // no collector was able to add this node
282 LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
283 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
284 statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
285 statManagerConfig.getMaxNodesForCollector());
287 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
288 statCollectorsNew.add(newCollector);
289 statCollectors = Collections.unmodifiableList(statCollectorsNew);
290 nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
291 LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
293 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
298 // add to the collector, even if it rejects it.
299 collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
305 public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
306 flowListeningCommiter.cleanForDisconnect(nodeIdent);
308 Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
309 if (collectorUUIDPair != null) {
310 StatPermCollector collector = collectorUUIDPair.getLeft();
311 if (collector != null) {
312 nodeCollectorMap.remove(nodeIdent);
313 LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
315 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
316 if (!collector.hasActiveNodes()) {
317 synchronized (statCollectorLock) {
318 if (collector.hasActiveNodes()) {
321 final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
322 newStatColl.remove(collector);
323 statCollectors = Collections.unmodifiableList(newStatColl);
326 LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
328 LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
331 LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
335 LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
340 public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
341 final StatCapabTypes statCapab) {
342 for (final StatPermCollector collector : statCollectors) {
343 if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
347 LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
351 public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
352 final StatCapabTypes statCapab) {
353 for (final StatPermCollector collector : statCollectors) {
354 if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
358 LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
361 /* Getter internal Statistic Manager Job Classes */
363 public StatRpcMsgManager getRpcMsgManager() {
364 return rpcMsgManager;
368 public StatNodeRegistration getNodeRegistrator() {
369 return nodeRegistrator;
373 public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
374 return flowListeningCommiter;
378 public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
379 return meterListeningCommiter;
383 public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
384 return groupListeningCommiter;
388 public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
389 return queueNotifyCommiter;
394 public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
395 return tableNotifCommiter;
399 public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
400 return portNotifyCommiter;
404 public StatisticsManagerConfig getConfiguration() {
405 return statManagerConfig;
409 public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
410 Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
411 if (permCollectorUUIDPair != null) {
412 return permCollectorUUIDPair.getRight();
414 // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
415 return UUID.fromString("invalid-uuid");
419 public void setOwnershipService(EntityOwnershipService ownershipService) {
420 this.ownershipService = ownershipService;
424 public EntityOwnershipService getOwnershipService() {
425 return this.ownershipService;