X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatPermCollectorImpl.java;h=9dd70b5c2837287285feb7b6a54180d0e6866a5f;hp=d008042e8d014e639be7c02f7f922fb847f9746a;hb=f8670b417a2296050152faafe4157705ad2e085d;hpb=d45e906a590d8ee44ea7957179aa378515d6ba47 diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java index d008042e8d..9dd70b5c28 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatPermCollectorImpl.java @@ -1,16 +1,19 @@ package org.opendaylight.controller.md.statistics.manager.impl; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.opendaylight.controller.md.statistics.manager.StatPermCollector; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; @@ -39,7 +42,7 @@ public class StatPermCollectorImpl implements StatPermCollector { private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class); - private final static long STAT_COLLECT_TIME_OUT = 30000L; + private final static long STAT_COLLECT_TIME_OUT = 3000L; private final ExecutorService statNetCollectorServ; private final StatisticsManager manager; @@ -50,12 +53,14 @@ public class StatPermCollectorImpl implements StatPermCollector { private final Object statCollectorLock = new Object(); private final Object statNodeHolderLock = new Object(); + private final Object transNotifyLock = new Object(); private Map, StatNodeInfoHolder> statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); private volatile boolean wakeMe = false; private volatile boolean finishing = false; + private TransactionId actualTransactionId; public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr, final int maxNodeForCollectors) { @@ -76,10 +81,15 @@ public class StatPermCollectorImpl implements StatPermCollector { public void close() { statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } + @Override + public boolean hasActiveNodes() { + return ( ! statNodeHolder.isEmpty()); + } + @Override public boolean isProvidedFlowNodeActive( final InstanceIdentifier flowNode) { @@ -89,9 +99,7 @@ public class StatPermCollectorImpl implements StatPermCollector { @Override public boolean connectedNodeRegistration(final InstanceIdentifier ident, final List statTypes, final Short nrOfSwitchTables) { - if (ident.isWildcarded()) { - LOG.warn("FlowCapableNode IstanceIdentifier {} registration can not be wildcarded!", ident); - } else { + if (isNodeIdentValidForUse(ident)) { if ( ! statNodeHolder.containsKey(ident)) { synchronized (statNodeHolderLock) { final boolean startStatCollecting = statNodeHolder.size() == 0; @@ -119,9 +127,7 @@ public class StatPermCollectorImpl implements StatPermCollector { @Override public boolean disconnectedNodeUnregistration(final InstanceIdentifier ident) { - if (ident.isWildcarded()) { - LOG.warn("FlowCapableNode IstanceIdentifier {} unregistration can not be wildcarded!", ident); - } else { + if (isNodeIdentValidForUse(ident)) { if (statNodeHolder.containsKey(ident)) { synchronized (statNodeHolderLock) { if (statNodeHolder.containsKey(ident)) { @@ -132,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } if (statNodeHolder.isEmpty()) { finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } return true; @@ -143,12 +149,41 @@ public class StatPermCollectorImpl implements StatPermCollector { } @Override - public void collectNextStatistics() { - if (wakeMe) { - synchronized (statCollectorLock) { - if (wakeMe) { - LOG.trace("STAT-COLLECTOR is notified to conntinue"); - statCollectorLock.notify(); + public boolean registerAdditionalNodeFeature(final InstanceIdentifier ident, + final StatCapabTypes statCapab) { + if (isNodeIdentValidForUse(ident)) { + if ( ! statNodeHolder.containsKey(ident)) { + return false; + } + final StatNodeInfoHolder statNode = statNodeHolder.get(ident); + if ( ! statNode.getStatMarkers().contains(statCapab)) { + synchronized (statNodeHolderLock) { + if ( ! statNode.getStatMarkers().contains(statCapab)) { + final List statCapabForEdit = new ArrayList<>(statNode.getStatMarkers()); + statCapabForEdit.add(statCapab); + final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(), + Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables()); + + final Map, StatNodeInfoHolder> statNodes = + new HashMap<>(statNodeHolder); + statNodes.put(ident, nodeInfoHolder); + statNodeHolder = Collections.unmodifiableMap(statNodes); + } + } + } + } + return true; + } + + @Override + public void collectNextStatistics(final TransactionId xid) { + if (checkTransactionId(xid)) { + if (wakeMe) { + synchronized (statCollectorLock) { + if (wakeMe) { + LOG.trace("STAT-COLLECTOR is notified to conntinue"); + statCollectorLock.notify(); + } } } } @@ -157,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector { @Override public void run() { try { + // sleep 5 second before collecting all statistics cycles is important + // for loading all Nodes to Operational/DS Thread.sleep(5000); } catch (final InterruptedException e1) { @@ -205,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } catch (final InterruptedException e) { LOG.warn("statCollector has been interrupted waiting stat Response sleep", e); } finally { + setActualTransactionId(null); wakeMe = false; } } @@ -220,53 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector { if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) { break; } - switch (statMarker) { - case PORT_STATS: - LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllPortsStat(actualNodeRef); - waitingForNotification(); - break; - case QUEUE_STATS: - LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllQueueStat(actualNodeRef); - waitingForNotification(); - break; - case TABLE_STATS: - LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllTablesStat(actualNodeRef); - waitingForNotification(); - break; - case GROUP_STATS: - LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getGroupFeaturesStat(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef); - waitingForNotification(); - break; - case METER_STATS: - LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getMeterFeaturesStat(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllMetersStat(actualNodeRef); - waitingForNotification(); - break; - case FLOW_STATS: - LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef); - waitingForNotification(); - LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); - for (short i = 0; i < maxTables; i++) { - final TableId tableId = new TableId(i); - manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + try { + switch (statMarker) { + case PORT_STATS: + LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case QUEUE_STATS: + LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get()); + waitingForNotification(); + break; + case TABLE_STATS: + LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get()); + waitingForNotification(); + break; + case GROUP_STATS: + LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case METER_STATS: + LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get()); + waitingForNotification(); + break; + case FLOW_STATS: + LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get()); + waitingForNotification(); + LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); + for (short i = 0; i < maxTables; i++) { + final TableId tableId = new TableId(i); + manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + } + break; + default: + /* Exception for programmers in implementation cycle */ + throw new IllegalStateException("Not implemented ASK for " + statMarker); } - break; - default: - /* Exception for programmers in implementation cycle */ - throw new IllegalStateException("Not implemented ASK for " + statMarker); + } catch (InterruptedException | ExecutionException ex) { + LOG.warn("Unexpected RPC exception by call RPC Future!", ex); + continue; } } } @@ -297,9 +336,28 @@ public class StatPermCollectorImpl implements StatPermCollector { } } - @Override - public boolean hasActiveNodes() { - return ( ! statNodeHolder.isEmpty()); + private boolean isNodeIdentValidForUse(final InstanceIdentifier ident) { + if (ident == null) { + LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!"); + return false; + } + if (ident.isWildcarded()) { + LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident); + return false; + } + return true; + } + + private boolean checkTransactionId(final TransactionId xid) { + synchronized (transNotifyLock) { + return actualTransactionId != null && actualTransactionId.equals(xid); + } + } + + private void setActualTransactionId(final TransactionId transactionId) { + synchronized (transNotifyLock) { + actualTransactionId = transactionId; + } } }