Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatisticsManagerImpl.java
index e099da32ac5e2245ea31e650b2c689d0999f4745..3bd0cc2a65303629a6635336c4f3b97143aca98d 100644 (file)
@@ -8,15 +8,21 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadFactory;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -28,15 +34,14 @@ import org.opendaylight.openflowplugin.applications.statistics.manager.StatListe
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
@@ -48,9 +53,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
 * statistics-manager
 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -58,7 +60,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 * StatisticsManagerImpl
 * It represent a central point for whole module. Implementation
 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
-* Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
+* Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
 * In next, StatisticsManager provides all DS contact Transaction services.
 *
@@ -73,10 +75,13 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private static final int MAX_BATCH = 100;
 
    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
+   private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
+   private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
+
 
-   private final DataBroker dataBroker;
-   private final ExecutorService statRpcMsgManagerExecutor;
+    private final DataBroker dataBroker;
    private final ExecutorService statDataStoreOperationServ;
+   private EntityOwnershipService ownershipService;
    private StatRpcMsgManager rpcMsgManager;
    private List<StatPermCollector> statCollectors;
    private final Object statCollectorLock = new Object();
@@ -98,7 +103,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
        ThreadFactory threadFact;
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
-       statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
        txChain =  dataBroker.createTransactionChain(this);
@@ -111,14 +115,13 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
        statCollectors = Collections.emptyList();
        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
-       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
-       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
-       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
-       tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
-       portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
-       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
-
-       statRpcMsgManagerExecutor.execute(rpcMsgManager);
+       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+       tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+       portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
+
        statDataStoreOperationServ.execute(this);
        LOG.info("Statistics Manager started successfully!");
    }
@@ -147,8 +150,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
            }
            statCollectors = null;
        }
-       rpcMsgManager = close(rpcMsgManager);
-       statRpcMsgManagerExecutor.shutdown();
+       rpcMsgManager = null;
        statDataStoreOperationServ.shutdown();
        txChain = close(txChain);
    }
@@ -158,7 +160,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        // we don't need to block anything - next statistics come soon
        final boolean success = dataStoreOperQueue.offer(op);
        if ( ! success) {
-           LOG.debug("Stat DS/Operational submiter Queue is full!");
+           LOG.debug("Stat DS/Operational submitter Queue is full!");
        }
    }
 
@@ -166,16 +168,26 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public void run() {
        /* Neverending cyle - wait for finishing */
        while ( ! finishing) {
+           StatDataStoreOperation op = null;
            try {
-               StatDataStoreOperation op = dataStoreOperQueue.take();
+               op = dataStoreOperQueue.take();
                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
 
                int ops = 0;
                do {
-                   op.applyOperation(tx);
+                   Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
+                   if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
+                       // dont apply operations for nodes which have been disconnected or if there uuids do not match
+                       // this can happen if operations are queued and node is removed.
+                       // if the uuids dont match, it means that the stat operation are stale and belong to the same node
+                       // which got disconnected and connected again.
+                       op.applyOperation(tx);
+                       ops++;
+                   } else {
+                       LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
+                   }
 
-                   ops++;
                    if (ops < MAX_BATCH) {
                        op = dataStoreOperQueue.poll();
                    } else {
@@ -185,12 +197,14 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-                   tx.submit().checkedGet();
+               tx.submit().checkedGet();
            } catch (final InterruptedException e) {
-               LOG.warn("Stat Manager DS Operation thread interupted!", e);
+               LOG.warn("Stat Manager DS Operation thread interrupted, while " +
+                       "waiting for StatDataStore Operation task!", e);
                finishing = true;
            } catch (final Exception e) {
-               LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+               LOG.warn("Unhandled exception during processing statistics for {}. " +
+                       "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
                txChain.close();
                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
                cleanDataStoreOperQueue();
@@ -203,19 +217,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
-           StatDataStoreOperation op = dataStoreOperQueue.poll();
-
-           // Execute the node removal clean up operation if queued in the
-           // operational queue.
-           if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
-               try {
-                   LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
-                   op.applyOperation(null);
-               } catch (final Exception e) {
-                   LOG.warn("Unhandled exception while cleaning up internal data of node [{}]. "
-                           + "Exception {}",op.getNodeId(), e);
-               }
-           }
+           dataStoreOperQueue.poll();
        }
    }
 
@@ -249,52 +251,86 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        }
    }
 
-   @Override
-   public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
-           final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-               return;
-           }
-       }
-       synchronized (statCollectorLock) {
-           for (final StatPermCollector collector : statCollectors) {
-               if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-                   return;
-               }
-           }
-           final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
-                   statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
-                   statManagerConfig.getMaxNodesForCollector());
-           final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
-           newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
-           statCollectorsNew.add(newCollector);
-           statCollectors = Collections.unmodifiableList(statCollectorsNew);
-       }
-   }
+    @Override
+    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
+            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
+
+
+        Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+        if (collectorUUIDPair == null) {
+            // no collector contains this node,
+            // check if one of the collectors can accommodate it
+            // if no then add a new collector
+
+            synchronized(statCollectorLock) {
+                for (int i = statCollectors.size() - 1; i >= 0; i--) {
+                    // start from back of the list as most likely previous ones might be full
+                    final StatPermCollector aCollector = statCollectors.get(i);
+                    if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
+                        // if the collector returns true after adding node, then return
+                        nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
+                        LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
+                                numNodesBeingCollected.incrementAndGet());
+                        return;
+                    }
+                }
+                // no collector was able to add this node
+                LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
+                final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
+                        statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+                        statManagerConfig.getMaxNodesForCollector());
+
+                final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
+                statCollectorsNew.add(newCollector);
+                statCollectors = Collections.unmodifiableList(statCollectorsNew);
+                nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
+                LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
+
+                newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+            }
+
+
+        } else {
+            // add to the collector, even if it rejects it.
+            collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+        }
+    }
 
-   @Override
-   public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
-       flowListeningCommiter.cleanForDisconnect(nodeIdent);
 
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.disconnectedNodeUnregistration(nodeIdent)) {
-               if ( ! collector.hasActiveNodes()) {
-                   synchronized (statCollectorLock) {
-                       if (collector.hasActiveNodes()) {
-                           return;
-                       }
-                       final List<StatPermCollector> newStatColl =
-                               new ArrayList<>(statCollectors);
-                       newStatColl.remove(collector);
-                       statCollectors = Collections.unmodifiableList(newStatColl);
-                   }
-               }
-               return;
-           }
-       }
-       LOG.debug("Node {} has not been removed.", nodeIdent);
-   }
+    @Override
+    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+        flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
+        Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+        if (collectorUUIDPair != null) {
+            StatPermCollector collector = collectorUUIDPair.getLeft();
+            if (collector != null) {
+                nodeCollectorMap.remove(nodeIdent);
+                LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
+
+                if (collector.disconnectedNodeUnregistration(nodeIdent)) {
+                    if (!collector.hasActiveNodes()) {
+                        synchronized (statCollectorLock) {
+                            if (collector.hasActiveNodes()) {
+                                return;
+                            }
+                            final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
+                            newStatColl.remove(collector);
+                            statCollectors = Collections.unmodifiableList(newStatColl);
+                        }
+                    }
+                    LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
+                } else {
+                    LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
+                }
+            } else {
+                LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
+            }
+
+        } else {
+            LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
+        }
+    }
 
    @Override
    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
@@ -307,6 +343,17 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
    }
 
+    @Override
+    public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
+                                              final StatCapabTypes statCapab) {
+        for (final StatPermCollector collector : statCollectors) {
+            if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
+                return;
+            }
+        }
+        LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
+    }
+
    /* Getter internal Statistic Manager Job Classes */
    @Override
    public StatRpcMsgManager getRpcMsgManager() {
@@ -353,5 +400,26 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
     public StatisticsManagerConfig getConfiguration() {
         return statManagerConfig;
     }
+
+    @Override
+    public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
+        Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
+        if (permCollectorUUIDPair != null) {
+            return permCollectorUUIDPair.getRight();
+        }
+        // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
+        return UUID.fromString("invalid-uuid");
+    }
+
+    @Override
+    public void setOwnershipService(EntityOwnershipService ownershipService) {
+        this.ownershipService = ownershipService;
+    }
+
+    @Override
+    public EntityOwnershipService getOwnershipService() {
+        return this.ownershipService;
+    }
+
 }