From f8670b417a2296050152faafe4157705ad2e085d Mon Sep 17 00:00:00 2001 From: Vaclav Demcak Date: Tue, 2 Dec 2014 11:36:34 +0100 Subject: [PATCH] Fix bug 2450 - Statistics collection slow - performance * fix timeout value for statWaiter to notification (30 sec is mistake - 3sec is correct value) * add check TransactionId for every notification (prevent unexpected notification for collecting next statistics) * timeout has to clear TransactionId (prevention for notification from slower statistics processes * patch 3 - revert the log level msg (debuging issue in StatPermCollectorImpl) - change an expiration calculation for cached RPC results (StatRpcMsgManagerImpl) - fix conditions for call notifyToCollectNextStat (Meter, Group) succesfull tested for karaf-compatible Change-Id: I54d7fe9e5c1a5d265c9378507fce1163691b62e5 Signed-off-by: Vaclav Demcak --- .../statistics/manager/StatPermCollector.java | 3 +- .../statistics/manager/StatRpcMsgManager.java | 20 +-- .../statistics/manager/StatisticsManager.java | 3 +- .../impl/StatAbstractNotifyCommit.java | 4 +- .../manager/impl/StatListenCommitFlow.java | 2 +- .../manager/impl/StatListenCommitGroup.java | 6 +- .../manager/impl/StatListenCommitMeter.java | 6 +- .../manager/impl/StatListenCommitQueue.java | 2 +- .../manager/impl/StatNotifyCommitPort.java | 2 +- .../manager/impl/StatNotifyCommitTable.java | 2 +- .../manager/impl/StatPermCollectorImpl.java | 128 +++++++++++------- .../manager/impl/StatRpcMsgManagerImpl.java | 71 ++++++---- .../manager/impl/StatisticsManagerImpl.java | 32 +++-- 13 files changed, 168 insertions(+), 113 deletions(-) diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatPermCollector.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatPermCollector.java index 16ad28dd4f..94d6dfa651 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatPermCollector.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatPermCollector.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.md.statistics.manager; import java.util.List; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; @@ -112,7 +113,7 @@ public interface StatPermCollector extends Runnable, AutoCloseable { * It is call from collecting allStatistics methods as a future result for * Operational/DS statistic store call (does not matter in the outcome). */ - void collectNextStatistics(); + void collectNextStatistics(TransactionId xid); /** * Method returns true if collector has registered some active nodes diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatRpcMsgManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatRpcMsgManager.java index 0576c2a645..62319ad594 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatRpcMsgManager.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatRpcMsgManager.java @@ -21,6 +21,7 @@ import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.common.RpcResult; import com.google.common.base.Optional; +import com.google.common.util.concurrent.SettableFuture; /** * statistics-manager @@ -77,7 +78,8 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param future - result every Device RPC call */ - void registrationRpcFutureCallBack(Future> future, D inputObj, NodeRef ref); + void registrationRpcFutureCallBack( + Future> future, D inputObj, NodeRef ref, SettableFuture resultTransId); /** * Method adds Notification which is marked as Multipart to the transaction cash @@ -104,7 +106,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllGroupsStat(NodeRef nodeRef); + Future getAllGroupsStat(NodeRef nodeRef); /** * Method wraps OpendaylightGroupStatisticsService.getGroupDescription @@ -112,7 +114,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllGroupsConfStats(NodeRef nodeRef); + Future getAllGroupsConfStats(NodeRef nodeRef); /** * Method wraps OpendaylightMeterStatisticsService.getGroupFeatures @@ -128,7 +130,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllMetersStat(NodeRef nodeRef); + Future getAllMetersStat(NodeRef nodeRef); /** * Method wraps OpendaylightMeterStatisticsService.getAllMeterConfigStatistics @@ -136,7 +138,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllMeterConfigStat(NodeRef nodeRef); + Future getAllMeterConfigStat(NodeRef nodeRef); /** * Method wraps OpendaylightMeterStatisticsService.getMeterFeatures @@ -152,7 +154,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllFlowsStat(NodeRef nodeRef); + Future getAllFlowsStat(NodeRef nodeRef); /** * Method wraps OpendaylightFlowStatisticsService.getAggregateFlowStatisticsFromFlowTableForAllFlows @@ -169,7 +171,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllPortsStat(NodeRef nodeRef); + Future getAllPortsStat(NodeRef nodeRef); /** * Method wraps OpendaylightFlowTableStatisticsService.getFlowTablesStatistics @@ -177,7 +179,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllTablesStat(NodeRef nodeRef); + Future getAllTablesStat(NodeRef nodeRef); /** * Method wraps OpendaylightQueueStatisticsService.getAllQueuesStatisticsFromAllPorts @@ -185,7 +187,7 @@ public interface StatRpcMsgManager extends Runnable, AutoCloseable { * * @param NodeRef nodeRef */ - void getAllQueueStat(NodeRef nodeRef); + Future getAllQueueStat(NodeRef nodeRef); } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java index 831dc224d1..751a68965d 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java @@ -20,6 +20,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.me import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; @@ -97,7 +98,7 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen * * @param nodeIdent */ - void collectNextStatistics(InstanceIdentifier nodeIdent); + void collectNextStatistics(InstanceIdentifier nodeIdent, TransactionId xid); /** * Method wraps {@link StatPermCollector}.connectedNodeRegistration to provide diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractNotifyCommit.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractNotifyCommit.java index 6bc6a30f8f..3f0e5e430e 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractNotifyCommit.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractNotifyCommit.java @@ -84,9 +84,9 @@ public abstract class StatAbstractNotifyCommit i return manager.isProvidedFlowNodeActive(nodeIdent); } - protected void notifyToCollectNextStatistics(final InstanceIdentifier nodeIdent) { + protected void notifyToCollectNextStatistics(final InstanceIdentifier nodeIdent, final TransactionId xid) { Preconditions.checkNotNull(nodeIdent, "FlowCapableNode ident can not be null!"); - manager.collectNextStatistics(nodeIdent); + manager.collectNextStatistics(nodeIdent, xid); } /** diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java index 230425999e..e17c45dc76 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java @@ -216,7 +216,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit, StatNodeInfoHolder> statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); private volatile boolean wakeMe = false; private volatile boolean finishing = false; + private TransactionId actualTransactionId; public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr, final int maxNodeForCollectors) { @@ -77,7 +81,7 @@ public class StatPermCollectorImpl implements StatPermCollector { public void close() { statNodeHolder = Collections., StatNodeInfoHolder> emptyMap(); finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } @@ -134,7 +138,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } if (statNodeHolder.isEmpty()) { finishing = true; - collectNextStatistics(); + collectNextStatistics(actualTransactionId); statNetCollectorServ.shutdown(); } return true; @@ -172,12 +176,14 @@ public class StatPermCollectorImpl implements StatPermCollector { } @Override - public void collectNextStatistics() { - if (wakeMe) { - synchronized (statCollectorLock) { - if (wakeMe) { - LOG.trace("STAT-COLLECTOR is notified to conntinue"); - statCollectorLock.notify(); + public void collectNextStatistics(final TransactionId xid) { + if (checkTransactionId(xid)) { + if (wakeMe) { + synchronized (statCollectorLock) { + if (wakeMe) { + LOG.trace("STAT-COLLECTOR is notified to conntinue"); + statCollectorLock.notify(); + } } } } @@ -186,6 +192,8 @@ public class StatPermCollectorImpl implements StatPermCollector { @Override public void run() { try { + // sleep 5 second before collecting all statistics cycles is important + // for loading all Nodes to Operational/DS Thread.sleep(5000); } catch (final InterruptedException e1) { @@ -234,6 +242,7 @@ public class StatPermCollectorImpl implements StatPermCollector { } catch (final InterruptedException e) { LOG.warn("statCollector has been interrupted waiting stat Response sleep", e); } finally { + setActualTransactionId(null); wakeMe = false; } } @@ -249,49 +258,54 @@ public class StatPermCollectorImpl implements StatPermCollector { if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) { break; } - switch (statMarker) { - case PORT_STATS: - LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllPortsStat(actualNodeRef); - waitingForNotification(); - break; - case QUEUE_STATS: - LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllQueueStat(actualNodeRef); - waitingForNotification(); - break; - case TABLE_STATS: - LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllTablesStat(actualNodeRef); - waitingForNotification(); - break; - case GROUP_STATS: - LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef); - waitingForNotification(); - break; - case METER_STATS: - LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef); - waitingForNotification(); - manager.getRpcMsgManager().getAllMetersStat(actualNodeRef); - waitingForNotification(); - break; - case FLOW_STATS: - LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); - manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef); - waitingForNotification(); - LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); - for (short i = 0; i < maxTables; i++) { - final TableId tableId = new TableId(i); - manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + try { + switch (statMarker) { + case PORT_STATS: + LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case QUEUE_STATS: + LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get()); + waitingForNotification(); + break; + case TABLE_STATS: + LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get()); + waitingForNotification(); + break; + case GROUP_STATS: + LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get()); + waitingForNotification(); + break; + case METER_STATS: + LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get()); + waitingForNotification(); + setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get()); + waitingForNotification(); + break; + case FLOW_STATS: + LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef); + setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get()); + waitingForNotification(); + LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef); + for (short i = 0; i < maxTables; i++) { + final TableId tableId = new TableId(i); + manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId); + } + break; + default: + /* Exception for programmers in implementation cycle */ + throw new IllegalStateException("Not implemented ASK for " + statMarker); } - break; - default: - /* Exception for programmers in implementation cycle */ - throw new IllegalStateException("Not implemented ASK for " + statMarker); + } catch (InterruptedException | ExecutionException ex) { + LOG.warn("Unexpected RPC exception by call RPC Future!", ex); + continue; } } } @@ -333,5 +347,17 @@ public class StatPermCollectorImpl implements StatPermCollector { } return true; } + + private boolean checkTransactionId(final TransactionId xid) { + synchronized (transNotifyLock) { + return actualTransactionId != null && actualTransactionId.equals(xid); + } + } + + private void setActualTransactionId(final TransactionId transactionId) { + synchronized (transNotifyLock) { + actualTransactionId = transactionId; + } + } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java index 176e52708b..4870223c0f 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -40,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; @@ -82,7 +83,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final Cache> txCache; - private final long maxLifeForRequest = 50; /* 50 second */ private final int queueCapacity = 5000; private final OpendaylightGroupStatisticsService groupStatsService; @@ -97,7 +97,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private volatile boolean finishing = false; public StatRpcMsgManagerImpl (final StatisticsManager manager, - final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) { + final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); groupStatsService = Preconditions.checkNotNull( @@ -120,7 +120,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { "OpendaylightQueueStatisticsService can not be null!"); statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity); - txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS) + /* nr. 7 is here nr. of possible statistic which are waiting for notification + * - check it in StatPermCollectorImpl method collectStatCrossNetwork */ + txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS) .maximumSize(10000).build(); } @@ -163,7 +165,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void registrationRpcFutureCallBack( - final Future> future, final D inputObj, final NodeRef nodeRef) { + final Future> future, final D inputObj, final NodeRef nodeRef, + final SettableFuture resultTransId) { Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback>() { @@ -174,6 +177,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { if (id == null) { LOG.warn("No protocol support"); } else { + if (resultTransId != null) { + resultTransId.set(id); + } final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = @@ -264,8 +270,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsStat(final NodeRef nodeRef) { + public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { @Override @@ -274,16 +281,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllGroupStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getAllGroupStatistics(builder.build()), null, nodeRef); + .getAllGroupStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGroupStat); + return result; } @Override - public void getAllMetersStat(final NodeRef nodeRef) { + public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { @Override @@ -292,16 +301,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterStatistics(builder.build()), null, nodeRef); + .getAllMeterStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllMeterStat); + return result; } @Override - public void getAllFlowsStat(final NodeRef nodeRef) { + public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { @Override @@ -310,11 +321,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowStatsService - .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef); + .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllFlowStat); + return result; } @Override @@ -334,7 +346,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { tbuilder.setId(tableId.getValue()); tbuilder.setKey(new TableKey(tableId.getValue())); registrationRpcFutureCallBack(flowStatsService - .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef); + .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); return null; } }; @@ -342,8 +354,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllPortsStat(final NodeRef nodeRef) { + public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { @Override @@ -351,17 +364,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { final GetAllNodeConnectorsStatisticsInputBuilder builder = new GetAllNodeConnectorsStatisticsInputBuilder(); builder.setNode(nodeRef); - registrationRpcFutureCallBack(portStatsService - .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef); + final Future> rpc = + portStatsService.getAllNodeConnectorsStatistics(builder.build()); + registrationRpcFutureCallBack(rpc, null, nodeRef, result); return null; } }; addGetAllStatJob(getAllPortsStat); + return result; } @Override - public void getAllTablesStat(final NodeRef nodeRef) { + public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { @Override @@ -370,16 +386,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetFlowTablesStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowTableStatsService - .getFlowTablesStatistics(builder.build()), null, nodeRef); + .getFlowTablesStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllTableStat); + return result; } @Override - public void getAllQueueStat(final NodeRef nodeRef) { + public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { @Override @@ -388,16 +406,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllQueuesStatisticsFromAllPortsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(queueStatsService - .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef); + .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllQueueStat); + return result; } @Override - public void getAllMeterConfigStat(final NodeRef nodeRef) { + public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { @Override @@ -406,11 +426,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterConfigStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterConfigStatistics(builder.build()), null, nodeRef); + .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(qetAllMeterConfStat); + return result; } @Override @@ -423,7 +444,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -440,7 +461,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -448,8 +469,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsConfStats(final NodeRef nodeRef) { + public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { @Override @@ -458,12 +480,13 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetGroupDescriptionInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getGroupDescription(builder.build()), null, nodeRef); + .getGroupDescription(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGropConfStat); + return result; } public class TransactionCacheContainerImpl implements TransactionCacheContainer { diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java index edf9fad433..1d03e38c16 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java @@ -8,8 +8,15 @@ package org.opendaylight.controller.md.statistics.manager.impl; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; + import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -28,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.me import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; @@ -39,14 +47,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadFactory; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * statistics-manager @@ -90,8 +92,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private final StatisticsManagerConfig statManagerConfig; - public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) { - this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig); + public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) { + statManagerConfig = Preconditions.checkNotNull(statManagerconfig); this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!"); ThreadFactory threadFact; threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build(); @@ -105,7 +107,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { public void start(final NotificationProviderService notifService, final RpcConsumerRegistry rpcRegistry) { Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); - rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval()); + rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector()); statCollectors = Collections.emptyList(); nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService); flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService); @@ -247,10 +249,10 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { } @Override - public void collectNextStatistics(final InstanceIdentifier nodeIdent) { + public void collectNextStatistics(final InstanceIdentifier nodeIdent, final TransactionId xid) { for (final StatPermCollector collector : statCollectors) { if (collector.isProvidedFlowNodeActive(nodeIdent)) { - collector.collectNextStatistics(); + collector.collectNextStatistics(xid); } } } -- 2.36.6