X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatRpcMsgManagerImpl.java;h=4870223c0ff4506e01cb9614b4ef209981ab85fd;hb=dbd3524a5cdd27dd49fd09f1cf4ff0ff40e85c5e;hp=176e52708b0a9a7c8ce91710ad2631285904a283;hpb=47ad11bd477096d9ffcf568e071ba68baabdbe6e;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java index 176e52708b..4870223c0f 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -40,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; @@ -82,7 +83,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final Cache> txCache; - private final long maxLifeForRequest = 50; /* 50 second */ private final int queueCapacity = 5000; private final OpendaylightGroupStatisticsService groupStatsService; @@ -97,7 +97,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private volatile boolean finishing = false; public StatRpcMsgManagerImpl (final StatisticsManager manager, - final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) { + final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); groupStatsService = Preconditions.checkNotNull( @@ -120,7 +120,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { "OpendaylightQueueStatisticsService can not be null!"); statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity); - txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS) + /* nr. 7 is here nr. of possible statistic which are waiting for notification + * - check it in StatPermCollectorImpl method collectStatCrossNetwork */ + txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS) .maximumSize(10000).build(); } @@ -163,7 +165,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void registrationRpcFutureCallBack( - final Future> future, final D inputObj, final NodeRef nodeRef) { + final Future> future, final D inputObj, final NodeRef nodeRef, + final SettableFuture resultTransId) { Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback>() { @@ -174,6 +177,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { if (id == null) { LOG.warn("No protocol support"); } else { + if (resultTransId != null) { + resultTransId.set(id); + } final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = @@ -264,8 +270,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsStat(final NodeRef nodeRef) { + public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { @Override @@ -274,16 +281,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllGroupStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getAllGroupStatistics(builder.build()), null, nodeRef); + .getAllGroupStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGroupStat); + return result; } @Override - public void getAllMetersStat(final NodeRef nodeRef) { + public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { @Override @@ -292,16 +301,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterStatistics(builder.build()), null, nodeRef); + .getAllMeterStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllMeterStat); + return result; } @Override - public void getAllFlowsStat(final NodeRef nodeRef) { + public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { @Override @@ -310,11 +321,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowStatsService - .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef); + .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllFlowStat); + return result; } @Override @@ -334,7 +346,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { tbuilder.setId(tableId.getValue()); tbuilder.setKey(new TableKey(tableId.getValue())); registrationRpcFutureCallBack(flowStatsService - .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef); + .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); return null; } }; @@ -342,8 +354,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllPortsStat(final NodeRef nodeRef) { + public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { @Override @@ -351,17 +364,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { final GetAllNodeConnectorsStatisticsInputBuilder builder = new GetAllNodeConnectorsStatisticsInputBuilder(); builder.setNode(nodeRef); - registrationRpcFutureCallBack(portStatsService - .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef); + final Future> rpc = + portStatsService.getAllNodeConnectorsStatistics(builder.build()); + registrationRpcFutureCallBack(rpc, null, nodeRef, result); return null; } }; addGetAllStatJob(getAllPortsStat); + return result; } @Override - public void getAllTablesStat(final NodeRef nodeRef) { + public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { @Override @@ -370,16 +386,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetFlowTablesStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowTableStatsService - .getFlowTablesStatistics(builder.build()), null, nodeRef); + .getFlowTablesStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllTableStat); + return result; } @Override - public void getAllQueueStat(final NodeRef nodeRef) { + public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { @Override @@ -388,16 +406,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllQueuesStatisticsFromAllPortsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(queueStatsService - .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef); + .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllQueueStat); + return result; } @Override - public void getAllMeterConfigStat(final NodeRef nodeRef) { + public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { @Override @@ -406,11 +426,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterConfigStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterConfigStatistics(builder.build()), null, nodeRef); + .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(qetAllMeterConfStat); + return result; } @Override @@ -423,7 +444,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -440,7 +461,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -448,8 +469,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsConfStats(final NodeRef nodeRef) { + public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { @Override @@ -458,12 +480,13 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetGroupDescriptionInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getGroupDescription(builder.build()), null, nodeRef); + .getGroupDescription(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGropConfStat); + return result; } public class TransactionCacheContainerImpl implements TransactionCacheContainer {