X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatPermCollectorImpl.java;h=9dd70b5c2837287285feb7b6a54180d0e6866a5f;hp=ff1778e8aa9900c0512c6888d8580c92b4c75482;hb=f8670b417a2296050152faafe4157705ad2e085d;hpb=7f2ecd54dd3eb09af469a610fdd541b48ed95b80 diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java index ff1778e8aa..9dd70b5c28 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java @@ -6,12 +6,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.opendaylight.controller.md.statistics.manager.StatPermCollector; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; @@ -40,7 +42,7 @@ public class StatPermCollectorImpl implements StatPermCollector { private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class); - private final static long STAT_COLLECT_TIME_OUT = 30000L; + private final static long STAT_COLLECT_TIME_OUT = 3000L; private final ExecutorService statNetCollectorServ; private final StatisticsManager manager; @@ -51,12 +53,14 @@ public class StatPermCollectorImpl implements StatPermCollector { private final Object statCollectorLock = new Object(); private final Object statNodeHolderLock = new Object(); + private final Object transNotifyLock = new Object(); private Map, StatNodeInfoHolder> statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); private volatile boolean wakeMe = false; private volatile boolean finishing = false; + private TransactionId actualTransactionId; public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr, final int maxNodeForCollectors) { @@ -77,7 +81,7 @@ public class StatPermCollectorImpl implements StatPermCollector { public void close() { statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } @@ -134,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } if (statNodeHolder.isEmpty()) { finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } return true; @@ -172,12 +176,14 @@ public class StatPermCollectorImpl implements StatPermCollector { } @Override - public void collectNextStatistics() { - if (wakeMe) { - synchronized (statCollectorLock) { - if (wakeMe) { - LOG.trace("STAT-COLLECTOR is notified to conntinue"); - statCollectorLock.notify(); + public void collectNextStatistics(final TransactionId xid) { + if (checkTransactionId(xid)) { + if (wakeMe) { + synchronized (statCollectorLock) { + if (wakeMe) { + LOG.trace("STAT-COLLECTOR is notified to conntinue"); + statCollectorLock.notify(); + } } } } @@ -186,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector { @Override public void run() { try { + // sleep 5 second before collecting all statistics cycles is important + // for loading all Nodes to Operational/DS Thread.sleep(5000); } catch (final InterruptedException e1) { @@ -234,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } catch (final InterruptedException e) { LOG.warn("statCollector has been interrupted waiting stat Response sleep", e); } finally { + setActualTransactionId(null); wakeMe = false; } } @@ -249,49 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector { if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) { break; } - switch (statMarker) { - case PORT_STATS: - LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllPortsStat(actualNodeRef); - waitingForNotification(); - break; - case QUEUE_STATS: - LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllQueueStat(actualNodeRef); - waitingForNotification(); - break; - case TABLE_STATS: - LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllTablesStat(actualNodeRef); - waitingForNotification(); - break; - case GROUP_STATS: - LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef); - waitingForNotification(); - break; - case METER_STATS: - LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllMetersStat(actualNodeRef); - waitingForNotification(); - break; - case FLOW_STATS: - LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef); - waitingForNotification(); - LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); - for (short i = 0; i < maxTables; i++) { - final TableId tableId = new TableId(i); - manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + try { + switch (statMarker) { + case PORT_STATS: + LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case QUEUE_STATS: + LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get()); + waitingForNotification(); + break; + case TABLE_STATS: + LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get()); + waitingForNotification(); + break; + case GROUP_STATS: + LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case METER_STATS: + LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get()); + waitingForNotification(); + break; + case FLOW_STATS: + LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get()); + waitingForNotification(); + LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); + for (short i = 0; i < maxTables; i++) { + final TableId tableId = new TableId(i); + manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + } + break; + default: + /* Exception for programmers in implementation cycle */ + throw new IllegalStateException("Not implemented ASK for " + statMarker); } - break; - default: - /* Exception for programmers in implementation cycle */ - throw new IllegalStateException("Not implemented ASK for " + statMarker); + } catch (InterruptedException | ExecutionException ex) { + LOG.warn("Unexpected RPC exception by call RPC Future!", ex); + continue; } } } @@ -333,5 +347,17 @@ public class StatPermCollectorImpl implements StatPermCollector { } return true; } + + private boolean checkTransactionId(final TransactionId xid) { + synchronized (transNotifyLock) { + return actualTransactionId != null && actualTransactionId.equals(xid); + } + } + + private void setActualTransactionId(final TransactionId transactionId) { + synchronized (transNotifyLock) { + actualTransactionId = transactionId; + } + } }