X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatRpcMsgManagerImpl.java;h=4870223c0ff4506e01cb9614b4ef209981ab85fd;hb=dbd3524a5cdd27dd49fd09f1cf4ff0ff40e85c5e;hp=80585548403f2ba2bf1b0e9498e26152acd6dace;hpb=d45e906a590d8ee44ea7957179aa378515d6ba47;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java index 8058554840..4870223c0f 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -15,10 +15,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; -import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; @@ -42,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; @@ -84,9 +83,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final Cache> txCache; - private final long maxLifeForRequest = 50; /* 50 second */ private final int queueCapacity = 5000; - private final StatisticsManager manager; private final OpendaylightGroupStatisticsService groupStatsService; private final OpendaylightMeterStatisticsService meterStatsService; @@ -100,8 +97,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private volatile boolean finishing = false; public StatRpcMsgManagerImpl (final StatisticsManager manager, - final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) { - this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!"); + final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { + Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); groupStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class), @@ -123,7 +120,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { "OpendaylightQueueStatisticsService can not be null!"); statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity); - txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS) + /* nr. 7 is here nr. of possible statistic which are waiting for notification + * - check it in StatPermCollectorImpl method collectStatCrossNetwork */ + txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS) .maximumSize(10000).build(); } @@ -166,7 +165,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void registrationRpcFutureCallBack( - final Future> future, final D inputObj, final NodeRef nodeRef) { + final Future> future, final D inputObj, final NodeRef nodeRef, + final SettableFuture resultTransId) { Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback>() { @@ -177,6 +177,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { if (id == null) { LOG.warn("No protocol support"); } else { + if (resultTransId != null) { + resultTransId.set(id); + } final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = @@ -267,8 +270,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsStat(final NodeRef nodeRef) { + public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { @Override @@ -277,16 +281,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllGroupStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getAllGroupStatistics(builder.build()), null, nodeRef); + .getAllGroupStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGroupStat); + return result; } @Override - public void getAllMetersStat(final NodeRef nodeRef) { + public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { @Override @@ -295,16 +301,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterStatistics(builder.build()), null, nodeRef); + .getAllMeterStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllMeterStat); + return result; } @Override - public void getAllFlowsStat(final NodeRef nodeRef) { + public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { @Override @@ -313,44 +321,42 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowStatsService - .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef); + .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllFlowStat); + return result; } @Override public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) { - - manager.enqueue(new StatDataStoreOperation() { + Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + Preconditions.checkArgument(tableId != null, "TableId can not be null!"); + final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { @Override - public void applyOperation(final ReadWriteTransaction tx) { - final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { - @Override - public Void call() throws Exception { - final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = - new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - builder.setNode(nodeRef); - builder.setTableId(tableId); - - final TableBuilder tbuilder = new TableBuilder(); - tbuilder.setId(tableId.getValue()); - tbuilder.setKey(new TableKey(tableId.getValue())); - registrationRpcFutureCallBack(flowStatsService - .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef); - return null; - } - }; - addGetAllStatJob(getAggregateFlowStat); + public Void call() throws Exception { + final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = + new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); + builder.setNode(nodeRef); + builder.setTableId(tableId); + + final TableBuilder tbuilder = new TableBuilder(); + tbuilder.setId(tableId.getValue()); + tbuilder.setKey(new TableKey(tableId.getValue())); + registrationRpcFutureCallBack(flowStatsService + .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); + return null; } - }); + }; + addGetAllStatJob(getAggregateFlowStat); } @Override - public void getAllPortsStat(final NodeRef nodeRef) { + public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { @Override @@ -358,17 +364,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { final GetAllNodeConnectorsStatisticsInputBuilder builder = new GetAllNodeConnectorsStatisticsInputBuilder(); builder.setNode(nodeRef); - registrationRpcFutureCallBack(portStatsService - .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef); + final Future> rpc = + portStatsService.getAllNodeConnectorsStatistics(builder.build()); + registrationRpcFutureCallBack(rpc, null, nodeRef, result); return null; } }; addGetAllStatJob(getAllPortsStat); + return result; } @Override - public void getAllTablesStat(final NodeRef nodeRef) { + public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { @Override @@ -377,16 +386,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetFlowTablesStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowTableStatsService - .getFlowTablesStatistics(builder.build()), null, nodeRef); + .getFlowTablesStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllTableStat); + return result; } @Override - public void getAllQueueStat(final NodeRef nodeRef) { + public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { @Override @@ -395,16 +406,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllQueuesStatisticsFromAllPortsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(queueStatsService - .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef); + .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllQueueStat); + return result; } @Override - public void getAllMeterConfigStat(final NodeRef nodeRef) { + public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { @Override @@ -413,11 +426,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterConfigStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterConfigStatistics(builder.build()), null, nodeRef); + .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(qetAllMeterConfStat); + return result; } @Override @@ -430,7 +444,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -447,7 +461,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -455,23 +469,24 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsConfStats(final NodeRef nodeRef) { + public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { @Override public Void call() throws Exception { - Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final GetGroupDescriptionInputBuilder builder = new GetGroupDescriptionInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getGroupDescription(builder.build()), null, nodeRef); + .getGroupDescription(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGropConfStat); + return result; } public class TransactionCacheContainerImpl implements TransactionCacheContainer {