X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatRpcMsgManagerImpl.java;h=20341bcc66e0b961f58e7c52b5122b35f38d3976;hp=80585548403f2ba2bf1b0e9498e26152acd6dace;hb=d80bf0f81bdeed907b290b67f26f1a3541ad3ea4;hpb=d45e906a590d8ee44ea7957179aa378515d6ba47 diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java index 8058554840..20341bcc66 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.md.statistics.manager.impl; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -15,10 +16,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; -import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; @@ -42,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; @@ -51,6 +51,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; @@ -84,9 +85,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final Cache> txCache; - private final long maxLifeForRequest = 50; /* 50 second */ private final int queueCapacity = 5000; - private final StatisticsManager manager; private final OpendaylightGroupStatisticsService groupStatsService; private final OpendaylightMeterStatisticsService meterStatsService; @@ -100,8 +99,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private volatile boolean finishing = false; public StatRpcMsgManagerImpl (final StatisticsManager manager, - final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) { - this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!"); + final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { + Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); groupStatsService = Preconditions.checkNotNull( rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class), @@ -123,7 +122,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { "OpendaylightQueueStatisticsService can not be null!"); statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity); - txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS) + /* nr. 7 is here nr. of possible statistic which are waiting for notification + * - check it in StatPermCollectorImpl method collectStatCrossNetwork */ + txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS) .maximumSize(10000).build(); } @@ -166,7 +167,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void registrationRpcFutureCallBack( - final Future> future, final D inputObj, final NodeRef nodeRef) { + final Future> future, final D inputObj, final NodeRef nodeRef, + final SettableFuture resultTransId) { Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), new FutureCallback>() { @@ -174,10 +176,15 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void onSuccess(final RpcResult result) { final TransactionId id = result.getResult().getTransactionId(); + final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); if (id == null) { - LOG.warn("No protocol support"); + String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})"); + LOG.warn("Node [{}] does not support statistics request type : {}", + nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2))); } else { - final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); + if (resultTransId != null) { + resultTransId.set(id); + } final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId()); @@ -267,8 +274,9 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsStat(final NodeRef nodeRef) { + public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { @Override @@ -277,16 +285,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllGroupStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getAllGroupStatistics(builder.build()), null, nodeRef); + .getAllGroupStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGroupStat); + return result; } @Override - public void getAllMetersStat(final NodeRef nodeRef) { + public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { @Override @@ -295,16 +305,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterStatistics(builder.build()), null, nodeRef); + .getAllMeterStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllMeterStat); + return result; } @Override - public void getAllFlowsStat(final NodeRef nodeRef) { + public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { @Override @@ -313,44 +325,42 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowStatsService - .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef); + .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllFlowStat); + return result; } @Override public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) { - - manager.enqueue(new StatDataStoreOperation() { + Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + Preconditions.checkArgument(tableId != null, "TableId can not be null!"); + final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { @Override - public void applyOperation(final ReadWriteTransaction tx) { - final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { - @Override - public Void call() throws Exception { - final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = - new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - builder.setNode(nodeRef); - builder.setTableId(tableId); - - final TableBuilder tbuilder = new TableBuilder(); - tbuilder.setId(tableId.getValue()); - tbuilder.setKey(new TableKey(tableId.getValue())); - registrationRpcFutureCallBack(flowStatsService - .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef); - return null; - } - }; - addGetAllStatJob(getAggregateFlowStat); + public Void call() throws Exception { + final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = + new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); + builder.setNode(nodeRef); + builder.setTableId(tableId); + + final TableBuilder tbuilder = new TableBuilder(); + tbuilder.setId(tableId.getValue()); + tbuilder.setKey(new TableKey(tableId.getValue())); + registrationRpcFutureCallBack(flowStatsService + .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); + return null; } - }); + }; + addGetAllStatJob(getAggregateFlowStat); } @Override - public void getAllPortsStat(final NodeRef nodeRef) { + public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { @Override @@ -358,17 +368,20 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { final GetAllNodeConnectorsStatisticsInputBuilder builder = new GetAllNodeConnectorsStatisticsInputBuilder(); builder.setNode(nodeRef); - registrationRpcFutureCallBack(portStatsService - .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef); + final Future> rpc = + portStatsService.getAllNodeConnectorsStatistics(builder.build()); + registrationRpcFutureCallBack(rpc, null, nodeRef, result); return null; } }; addGetAllStatJob(getAllPortsStat); + return result; } @Override - public void getAllTablesStat(final NodeRef nodeRef) { + public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { @Override @@ -377,16 +390,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetFlowTablesStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(flowTableStatsService - .getFlowTablesStatistics(builder.build()), null, nodeRef); + .getFlowTablesStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllTableStat); + return result; } @Override - public void getAllQueueStat(final NodeRef nodeRef) { + public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { @Override @@ -395,16 +410,18 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllQueuesStatisticsFromAllPortsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(queueStatsService - .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef); + .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllQueueStat); + return result; } @Override - public void getAllMeterConfigStat(final NodeRef nodeRef) { + public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { @Override @@ -413,11 +430,12 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { new GetAllMeterConfigStatisticsInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(meterStatsService - .getAllMeterConfigStatistics(builder.build()), null, nodeRef); + .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(qetAllMeterConfStat); + return result; } @Override @@ -430,7 +448,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -447,7 +465,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { /* RPC input */ final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); input.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef); + registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); return null; } }; @@ -455,23 +473,24 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } @Override - public void getAllGroupsConfStats(final NodeRef nodeRef) { + public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); + final SettableFuture result = SettableFuture.create(); final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { @Override public Void call() throws Exception { - Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!"); final GetGroupDescriptionInputBuilder builder = new GetGroupDescriptionInputBuilder(); builder.setNode(nodeRef); registrationRpcFutureCallBack(groupStatsService - .getGroupDescription(builder.build()), null, nodeRef); + .getGroupDescription(builder.build()), null, nodeRef, result); return null; } }; addGetAllStatJob(getAllGropConfStat); + return result; } public class TransactionCacheContainerImpl implements TransactionCacheContainer {