Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatisticsManagerImpl.java
index 82e46be007ab37271728a7c2eb6bed12515bbbd4..3bd0cc2a65303629a6635336c4f3b97143aca98d 100644 (file)
@@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -78,9 +79,9 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
 
 
-   private final DataBroker dataBroker;
-   private final ExecutorService statRpcMsgManagerExecutor;
+    private final DataBroker dataBroker;
    private final ExecutorService statDataStoreOperationServ;
+   private EntityOwnershipService ownershipService;
    private StatRpcMsgManager rpcMsgManager;
    private List<StatPermCollector> statCollectors;
    private final Object statCollectorLock = new Object();
@@ -102,7 +103,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
        ThreadFactory threadFact;
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
-       statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
        txChain =  dataBroker.createTransactionChain(this);
@@ -115,14 +115,13 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
        statCollectors = Collections.emptyList();
        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
-       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
-       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
-       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
-       tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
-       portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
-       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
-
-       statRpcMsgManagerExecutor.execute(rpcMsgManager);
+       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+       tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+       portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
+
        statDataStoreOperationServ.execute(this);
        LOG.info("Statistics Manager started successfully!");
    }
@@ -151,8 +150,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
            }
            statCollectors = null;
        }
-       rpcMsgManager = close(rpcMsgManager);
-       statRpcMsgManagerExecutor.shutdown();
+       rpcMsgManager = null;
        statDataStoreOperationServ.shutdown();
        txChain = close(txChain);
    }
@@ -162,7 +160,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        // we don't need to block anything - next statistics come soon
        final boolean success = dataStoreOperQueue.offer(op);
        if ( ! success) {
-           LOG.debug("Stat DS/Operational submiter Queue is full!");
+           LOG.debug("Stat DS/Operational submitter Queue is full!");
        }
    }
 
@@ -170,8 +168,9 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public void run() {
        /* Neverending cyle - wait for finishing */
        while ( ! finishing) {
+           StatDataStoreOperation op = null;
            try {
-               StatDataStoreOperation op = dataStoreOperQueue.take();
+               op = dataStoreOperQueue.take();
                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
 
@@ -200,10 +199,12 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                tx.submit().checkedGet();
            } catch (final InterruptedException e) {
-               LOG.warn("Stat Manager DS Operation thread interupted!", e);
+               LOG.warn("Stat Manager DS Operation thread interrupted, while " +
+                       "waiting for StatDataStore Operation task!", e);
                finishing = true;
            } catch (final Exception e) {
-               LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+               LOG.warn("Unhandled exception during processing statistics for {}. " +
+                       "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
                txChain.close();
                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
                cleanDataStoreOperQueue();
@@ -342,6 +343,17 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
    }
 
+    @Override
+    public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
+                                              final StatCapabTypes statCapab) {
+        for (final StatPermCollector collector : statCollectors) {
+            if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
+                return;
+            }
+        }
+        LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
+    }
+
    /* Getter internal Statistic Manager Job Classes */
    @Override
    public StatRpcMsgManager getRpcMsgManager() {
@@ -398,5 +410,16 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
         // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
         return UUID.fromString("invalid-uuid");
     }
+
+    @Override
+    public void setOwnershipService(EntityOwnershipService ownershipService) {
+        this.ownershipService = ownershipService;
+    }
+
+    @Override
+    public EntityOwnershipService getOwnershipService() {
+        return this.ownershipService;
+    }
+
 }