Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatPermCollectorImpl.java
index d008042e8d014e639be7c02f7f922fb847f9746a..9dd70b5c2837287285feb7b6a54180d0e6866a5f 100644 (file)
@@ -1,16 +1,19 @@
 package org.opendaylight.controller.md.statistics.manager.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
@@ -39,7 +42,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
 
-    private final static long STAT_COLLECT_TIME_OUT = 30000L;
+    private final static long STAT_COLLECT_TIME_OUT = 3000L;
 
     private final ExecutorService statNetCollectorServ;
     private final StatisticsManager manager;
@@ -50,12 +53,14 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final Object statCollectorLock = new Object();
     private final Object statNodeHolderLock = new Object();
+    private final Object transNotifyLock = new Object();
 
     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
 
     private volatile boolean wakeMe = false;
     private volatile boolean finishing = false;
+    private TransactionId actualTransactionId;
 
     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
             final int maxNodeForCollectors) {
@@ -76,10 +81,15 @@ public class StatPermCollectorImpl implements StatPermCollector {
     public void close() {
         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
         finishing = true;
-        collectNextStatistics();
+        collectNextStatistics(actualTransactionId);
         statNetCollectorServ.shutdown();
     }
 
+    @Override
+    public boolean hasActiveNodes() {
+        return ( ! statNodeHolder.isEmpty());
+    }
+
     @Override
     public boolean isProvidedFlowNodeActive(
             final InstanceIdentifier<Node> flowNode) {
@@ -89,9 +99,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
     @Override
     public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
-        if (ident.isWildcarded()) {
-            LOG.warn("FlowCapableNode IstanceIdentifier {} registration can not be wildcarded!", ident);
-        } else {
+        if (isNodeIdentValidForUse(ident)) {
             if ( ! statNodeHolder.containsKey(ident)) {
                 synchronized (statNodeHolderLock) {
                     final boolean startStatCollecting = statNodeHolder.size() == 0;
@@ -119,9 +127,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     @Override
     public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
-        if (ident.isWildcarded()) {
-            LOG.warn("FlowCapableNode IstanceIdentifier {} unregistration can not be wildcarded!", ident);
-        } else {
+        if (isNodeIdentValidForUse(ident)) {
             if (statNodeHolder.containsKey(ident)) {
                 synchronized (statNodeHolderLock) {
                     if (statNodeHolder.containsKey(ident)) {
@@ -132,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
                     }
                     if (statNodeHolder.isEmpty()) {
                         finishing = true;
-                        collectNextStatistics();
+                        collectNextStatistics(actualTransactionId);
                         statNetCollectorServ.shutdown();
                     }
                     return true;
@@ -143,12 +149,41 @@ public class StatPermCollectorImpl implements StatPermCollector {
     }
 
     @Override
-    public void collectNextStatistics() {
-        if (wakeMe) {
-            synchronized (statCollectorLock) {
-                if (wakeMe) {
-                    LOG.trace("STAT-COLLECTOR is notified to conntinue");
-                    statCollectorLock.notify();
+    public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
+            final StatCapabTypes statCapab) {
+        if (isNodeIdentValidForUse(ident)) {
+            if ( ! statNodeHolder.containsKey(ident)) {
+                return false;
+            }
+            final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
+            if ( ! statNode.getStatMarkers().contains(statCapab)) {
+                synchronized (statNodeHolderLock) {
+                    if ( ! statNode.getStatMarkers().contains(statCapab)) {
+                        final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
+                        statCapabForEdit.add(statCapab);
+                        final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
+                                Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
+
+                        final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
+                                new HashMap<>(statNodeHolder);
+                        statNodes.put(ident, nodeInfoHolder);
+                        statNodeHolder = Collections.unmodifiableMap(statNodes);
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void collectNextStatistics(final TransactionId xid) {
+        if (checkTransactionId(xid)) {
+            if (wakeMe) {
+                synchronized (statCollectorLock) {
+                    if (wakeMe) {
+                        LOG.trace("STAT-COLLECTOR is notified to conntinue");
+                        statCollectorLock.notify();
+                    }
                 }
             }
         }
@@ -157,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector {
     @Override
     public void run() {
         try {
+            // sleep 5 second before collecting all statistics cycles is important
+            // for loading all Nodes to Operational/DS
             Thread.sleep(5000);
         }
         catch (final InterruptedException e1) {
@@ -205,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
             } catch (final InterruptedException e) {
                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
             } finally {
+                setActualTransactionId(null);
                 wakeMe = false;
             }
         }
@@ -220,53 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector {
                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
                     break;
                 }
-                switch (statMarker) {
-                case PORT_STATS:
-                    LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case QUEUE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case TABLE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case GROUP_STATS:
-                    LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getGroupFeaturesStat(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case METER_STATS:
-                    LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getMeterFeaturesStat(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case FLOW_STATS:
-                    LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
-                    waitingForNotification();
-                    LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
-                    for (short i = 0; i < maxTables; i++) {
-                        final TableId tableId = new TableId(i);
-                        manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                try {
+                    switch (statMarker) {
+                    case PORT_STATS:
+                        LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case QUEUE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case TABLE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case GROUP_STATS:
+                        LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case METER_STATS:
+                        LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case FLOW_STATS:
+                        LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+                        for (short i = 0; i < maxTables; i++) {
+                            final TableId tableId = new TableId(i);
+                            manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                        }
+                        break;
+                    default:
+                        /* Exception for programmers in implementation cycle */
+                        throw new IllegalStateException("Not implemented ASK for " + statMarker);
                     }
-                    break;
-                default:
-                    /* Exception for programmers in implementation cycle */
-                    throw new IllegalStateException("Not implemented ASK for " + statMarker);
+                } catch (InterruptedException | ExecutionException ex) {
+                    LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+                    continue;
                 }
             }
         }
@@ -297,9 +336,28 @@ public class StatPermCollectorImpl implements StatPermCollector {
         }
     }
 
-    @Override
-    public boolean hasActiveNodes() {
-        return ( ! statNodeHolder.isEmpty());
+    private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
+        if (ident == null) {
+            LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
+            return false;
+        }
+        if (ident.isWildcarded()) {
+            LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean checkTransactionId(final TransactionId xid) {
+        synchronized (transNotifyLock) {
+            return actualTransactionId != null && actualTransactionId.equals(xid);
+        }
+    }
+
+    private void setActualTransactionId(final TransactionId transactionId) {
+        synchronized (transNotifyLock) {
+            actualTransactionId = transactionId;
+        }
     }
 }