package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* statistics-manager
* org.opendaylight.openflowplugin.applications.statistics.manager.impl
* StatisticsManagerImpl
* It represent a central point for whole module. Implementation
* {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
-* Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
+* Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
* for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
* In next, StatisticsManager provides all DS contact Transaction services.
*
private static final int MAX_BATCH = 100;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
+ private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
+ private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
- private final DataBroker dataBroker;
- private final ExecutorService statRpcMsgManagerExecutor;
+
+ private final DataBroker dataBroker;
private final ExecutorService statDataStoreOperationServ;
+ private EntityOwnershipService ownershipService;
private StatRpcMsgManager rpcMsgManager;
private List<StatPermCollector> statCollectors;
private final Object statCollectorLock = new Object();
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
- statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
txChain = dataBroker.createTransactionChain(this);
rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
- flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
- meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
- groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
- tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
- portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
- queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
-
- statRpcMsgManagerExecutor.execute(rpcMsgManager);
+ flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+ meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+ groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+ tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+ portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+ queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
+
statDataStoreOperationServ.execute(this);
LOG.info("Statistics Manager started successfully!");
}
}
statCollectors = null;
}
- rpcMsgManager = close(rpcMsgManager);
- statRpcMsgManagerExecutor.shutdown();
+ rpcMsgManager = null;
statDataStoreOperationServ.shutdown();
txChain = close(txChain);
}
// we don't need to block anything - next statistics come soon
final boolean success = dataStoreOperQueue.offer(op);
if ( ! success) {
- LOG.debug("Stat DS/Operational submiter Queue is full!");
+ LOG.debug("Stat DS/Operational submitter Queue is full!");
}
}
public void run() {
/* Neverending cyle - wait for finishing */
while ( ! finishing) {
+ StatDataStoreOperation op = null;
try {
- StatDataStoreOperation op = dataStoreOperQueue.take();
+ op = dataStoreOperQueue.take();
final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
do {
- op.applyOperation(tx);
+ Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
+ if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
+ // dont apply operations for nodes which have been disconnected or if there uuids do not match
+ // this can happen if operations are queued and node is removed.
+ // if the uuids dont match, it means that the stat operation are stale and belong to the same node
+ // which got disconnected and connected again.
+ op.applyOperation(tx);
+ ops++;
+ } else {
+ LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
+ }
- ops++;
if (ops < MAX_BATCH) {
op = dataStoreOperQueue.poll();
} else {
LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
- tx.submit().checkedGet();
+ tx.submit().checkedGet();
} catch (final InterruptedException e) {
- LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ LOG.warn("Stat Manager DS Operation thread interrupted, while " +
+ "waiting for StatDataStore Operation task!", e);
finishing = true;
} catch (final Exception e) {
- LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+ LOG.warn("Unhandled exception during processing statistics for {}. " +
+ "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
txChain.close();
txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
cleanDataStoreOperQueue();
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- StatDataStoreOperation op = dataStoreOperQueue.poll();
-
- // Execute the node removal clean up operation if queued in the
- // operational queue.
- if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
- try {
- LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
- op.applyOperation(null);
- } catch (final Exception e) {
- LOG.warn("Unhandled exception while cleaning up internal data of node [{}]. "
- + "Exception {}",op.getNodeId(), e);
- }
- }
+ dataStoreOperQueue.poll();
}
}
}
}
- @Override
- public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
- final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
- for (final StatPermCollector collector : statCollectors) {
- if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
- return;
- }
- }
- synchronized (statCollectorLock) {
- for (final StatPermCollector collector : statCollectors) {
- if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
- return;
- }
- }
- final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
- statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
- statManagerConfig.getMaxNodesForCollector());
- final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
- newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
- statCollectorsNew.add(newCollector);
- statCollectors = Collections.unmodifiableList(statCollectorsNew);
- }
- }
+ @Override
+ public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
+ final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
+
+
+ Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+ if (collectorUUIDPair == null) {
+ // no collector contains this node,
+ // check if one of the collectors can accommodate it
+ // if no then add a new collector
+
+ synchronized(statCollectorLock) {
+ for (int i = statCollectors.size() - 1; i >= 0; i--) {
+ // start from back of the list as most likely previous ones might be full
+ final StatPermCollector aCollector = statCollectors.get(i);
+ if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
+ // if the collector returns true after adding node, then return
+ nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
+ LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
+ numNodesBeingCollected.incrementAndGet());
+ return;
+ }
+ }
+ // no collector was able to add this node
+ LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
+ final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
+ statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+ statManagerConfig.getMaxNodesForCollector());
+
+ final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
+ statCollectorsNew.add(newCollector);
+ statCollectors = Collections.unmodifiableList(statCollectorsNew);
+ nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
+ LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
+
+ newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+ }
+
+
+ } else {
+ // add to the collector, even if it rejects it.
+ collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+ }
+ }
- @Override
- public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
- flowListeningCommiter.cleanForDisconnect(nodeIdent);
- for (final StatPermCollector collector : statCollectors) {
- if (collector.disconnectedNodeUnregistration(nodeIdent)) {
- if ( ! collector.hasActiveNodes()) {
- synchronized (statCollectorLock) {
- if (collector.hasActiveNodes()) {
- return;
- }
- final List<StatPermCollector> newStatColl =
- new ArrayList<>(statCollectors);
- newStatColl.remove(collector);
- statCollectors = Collections.unmodifiableList(newStatColl);
- }
- }
- return;
- }
- }
- LOG.debug("Node {} has not been removed.", nodeIdent);
- }
+ @Override
+ public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+ flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
+ Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+ if (collectorUUIDPair != null) {
+ StatPermCollector collector = collectorUUIDPair.getLeft();
+ if (collector != null) {
+ nodeCollectorMap.remove(nodeIdent);
+ LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
+
+ if (collector.disconnectedNodeUnregistration(nodeIdent)) {
+ if (!collector.hasActiveNodes()) {
+ synchronized (statCollectorLock) {
+ if (collector.hasActiveNodes()) {
+ return;
+ }
+ final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
+ newStatColl.remove(collector);
+ statCollectors = Collections.unmodifiableList(newStatColl);
+ }
+ }
+ LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
+ } else {
+ LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
+ }
+ } else {
+ LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
+ }
+
+ } else {
+ LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
+ }
+ }
@Override
public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
}
+ @Override
+ public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
+ final StatCapabTypes statCapab) {
+ for (final StatPermCollector collector : statCollectors) {
+ if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
+ return;
+ }
+ }
+ LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
+ }
+
/* Getter internal Statistic Manager Job Classes */
@Override
public StatRpcMsgManager getRpcMsgManager() {
public StatisticsManagerConfig getConfiguration() {
return statManagerConfig;
}
+
+ @Override
+ public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
+ Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
+ if (permCollectorUUIDPair != null) {
+ return permCollectorUUIDPair.getRight();
+ }
+ // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
+ return UUID.fromString("invalid-uuid");
+ }
+
+ @Override
+ public void setOwnershipService(EntityOwnershipService ownershipService) {
+ this.ownershipService = ownershipService;
+ }
+
+ @Override
+ public EntityOwnershipService getOwnershipService() {
+ return this.ownershipService;
+ }
+
}