Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatPermCollectorImpl.java
index ff1778e..9dd70b5 100644 (file)
@@ -6,12 +6,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
@@ -40,7 +42,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
 
-    private final static long STAT_COLLECT_TIME_OUT = 30000L;
+    private final static long STAT_COLLECT_TIME_OUT = 3000L;
 
     private final ExecutorService statNetCollectorServ;
     private final StatisticsManager manager;
@@ -51,12 +53,14 @@ public class StatPermCollectorImpl implements StatPermCollector {
 
     private final Object statCollectorLock = new Object();
     private final Object statNodeHolderLock = new Object();
+    private final Object transNotifyLock = new Object();
 
     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
 
     private volatile boolean wakeMe = false;
     private volatile boolean finishing = false;
+    private TransactionId actualTransactionId;
 
     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
             final int maxNodeForCollectors) {
@@ -77,7 +81,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
     public void close() {
         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
         finishing = true;
-        collectNextStatistics();
+        collectNextStatistics(actualTransactionId);
         statNetCollectorServ.shutdown();
     }
 
@@ -134,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
                     }
                     if (statNodeHolder.isEmpty()) {
                         finishing = true;
-                        collectNextStatistics();
+                        collectNextStatistics(actualTransactionId);
                         statNetCollectorServ.shutdown();
                     }
                     return true;
@@ -172,12 +176,14 @@ public class StatPermCollectorImpl implements StatPermCollector {
     }
 
     @Override
-    public void collectNextStatistics() {
-        if (wakeMe) {
-            synchronized (statCollectorLock) {
-                if (wakeMe) {
-                    LOG.trace("STAT-COLLECTOR is notified to conntinue");
-                    statCollectorLock.notify();
+    public void collectNextStatistics(final TransactionId xid) {
+        if (checkTransactionId(xid)) {
+            if (wakeMe) {
+                synchronized (statCollectorLock) {
+                    if (wakeMe) {
+                        LOG.trace("STAT-COLLECTOR is notified to conntinue");
+                        statCollectorLock.notify();
+                    }
                 }
             }
         }
@@ -186,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector {
     @Override
     public void run() {
         try {
+            // sleep 5 second before collecting all statistics cycles is important
+            // for loading all Nodes to Operational/DS
             Thread.sleep(5000);
         }
         catch (final InterruptedException e1) {
@@ -234,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
             } catch (final InterruptedException e) {
                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
             } finally {
+                setActualTransactionId(null);
                 wakeMe = false;
             }
         }
@@ -249,49 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector {
                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
                     break;
                 }
-                switch (statMarker) {
-                case PORT_STATS:
-                    LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case QUEUE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case TABLE_STATS:
-                    LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case GROUP_STATS:
-                    LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case METER_STATS:
-                    LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
-                    waitingForNotification();
-                    manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
-                    waitingForNotification();
-                    break;
-                case FLOW_STATS:
-                    LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
-                    manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
-                    waitingForNotification();
-                    LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
-                    for (short i = 0; i < maxTables; i++) {
-                        final TableId tableId = new TableId(i);
-                        manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                try {
+                    switch (statMarker) {
+                    case PORT_STATS:
+                        LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case QUEUE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case TABLE_STATS:
+                        LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case GROUP_STATS:
+                        LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case METER_STATS:
+                        LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+                        waitingForNotification();
+                        setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+                        waitingForNotification();
+                        break;
+                    case FLOW_STATS:
+                        LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+                        setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+                        waitingForNotification();
+                        LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+                        for (short i = 0; i < maxTables; i++) {
+                            final TableId tableId = new TableId(i);
+                            manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+                        }
+                        break;
+                    default:
+                        /* Exception for programmers in implementation cycle */
+                        throw new IllegalStateException("Not implemented ASK for " + statMarker);
                     }
-                    break;
-                default:
-                    /* Exception for programmers in implementation cycle */
-                    throw new IllegalStateException("Not implemented ASK for " + statMarker);
+                } catch (InterruptedException | ExecutionException ex) {
+                    LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+                    continue;
                 }
             }
         }
@@ -333,5 +347,17 @@ public class StatPermCollectorImpl implements StatPermCollector {
         }
         return true;
     }
+
+    private boolean checkTransactionId(final TransactionId xid) {
+        synchronized (transNotifyLock) {
+            return actualTransactionId != null && actualTransactionId.equals(xid);
+        }
+    }
+
+    private void setActualTransactionId(final TransactionId transactionId) {
+        synchronized (transNotifyLock) {
+            actualTransactionId = transactionId;
+        }
+    }
 }