Merge "Add MD-SAL artifacts"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
index 8430549be177fce7b84522d723f2ff6d7ddf0a14..edf9fad433f6af8244948fc181dfce97b3d24177 100644 (file)
@@ -8,21 +8,13 @@
 
 package org.opendaylight.controller.md.statistics.manager.impl;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
@@ -47,8 +39,14 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
 
 /**
 * statistics-manager
@@ -68,14 +66,12 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
 
-   private static final int QUEUE_DEPTH = 500;
-   private static final int MAX_BATCH = 1;
+   private static final int QUEUE_DEPTH = 5000;
+   private static final int MAX_BATCH = 100;
 
    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
 
    private final DataBroker dataBroker;
-   private final int maxNodesForCollectors;
-   private long minReqNetMonitInt;
    private final ExecutorService statRpcMsgManagerExecutor;
    private final ExecutorService statDataStoreOperationServ;
    private StatRpcMsgManager rpcMsgManager;
@@ -92,23 +88,24 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
 
-   public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) {
+   private final StatisticsManagerConfig statManagerConfig;
+
+   public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
+       this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
        ThreadFactory threadFact;
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
-       maxNodesForCollectors = maxNodesForCollector;
        txChain =  dataBroker.createTransactionChain(this);
    }
 
    @Override
    public void start(final NotificationProviderService notifService,
-           final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+           final RpcConsumerRegistry rpcRegistry) {
        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-       this.minReqNetMonitInt = minReqNetMonitInt;
-       rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt);
+       rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
        statCollectors = Collections.emptyList();
        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
@@ -125,6 +122,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    @Override
    public void close() throws Exception {
+       LOG.info("StatisticsManager close called");
        finishing = true;
        if (nodeRegistrator != null) {
            nodeRegistrator.close();
@@ -205,31 +203,22 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-               try {
                    tx.submit().checkedGet();
-               } catch (final TransactionCommitFailedException e) {
-                   LOG.warn("Stat DataStoreOperation unexpected State!", e);
-                   txChain.close();
-                   txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
-                   cleanDataStoreOperQueue();
-               }
-           }
-           catch (final IllegalStateException e) {
-               LOG.warn("Stat DataStoreOperation unexpected State!", e);
-           }
-           catch (final InterruptedException e) {
+           } catch (final InterruptedException e) {
                LOG.warn("Stat Manager DS Operation thread interupted!", e);
                finishing = true;
-           }
-           catch (final Exception e) {
-               LOG.warn("Stat DataStore Operation executor fail!", e);
+           } catch (final Exception e) {
+               LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+               txChain.close();
+               txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
+               cleanDataStoreOperQueue();
            }
        }
        // Drain all events, making sure any blocked threads are unblocked
        cleanDataStoreOperQueue();
    }
 
-   private void cleanDataStoreOperQueue() {
+   private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
            dataStoreOperQueue.poll();
@@ -240,9 +229,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
            final Throwable cause) {
        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
-       txChain.close();
-       txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
-       cleanDataStoreOperQueue();
    }
 
    @Override
@@ -284,7 +270,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
                }
            }
            final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
-                   minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors);
+                   statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+                   statManagerConfig.getMaxNodesForCollector());
            final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
            newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
            statCollectorsNew.add(newCollector);
@@ -294,6 +281,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    @Override
    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+       flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
        for (final StatPermCollector collector : statCollectors) {
            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
                if ( ! collector.hasActiveNodes()) {
@@ -310,7 +299,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
                return;
            }
        }
-       LOG.debug("Node {} has not removed.", nodeIdent);
+       LOG.debug("Node {} has not been removed.", nodeIdent);
+   }
+
+   @Override
+   public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
+           final StatCapabTypes statCapab) {
+       for (final StatPermCollector collector : statCollectors) {
+           if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
+               return;
+           }
+       }
+       LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
    }
 
    /* Getter internal Statistic Manager Job Classes */
@@ -354,5 +354,10 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
        return portNotifyCommiter;
    }
+
+    @Override
+    public StatisticsManagerConfig getConfiguration() {
+        return statManagerConfig;
+    }
 }