import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
- private final long maxLifeForRequest = 50; /* 50 second */
private final int queueCapacity = 5000;
- private final StatisticsManager manager;
private final OpendaylightGroupStatisticsService groupStatsService;
private final OpendaylightMeterStatisticsService meterStatsService;
private volatile boolean finishing = false;
public StatRpcMsgManagerImpl (final StatisticsManager manager,
- final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
- this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
+ final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
+ Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
groupStatsService = Preconditions.checkNotNull(
rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
"OpendaylightQueueStatisticsService can not be null!");
statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
- txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+ /* nr. 7 is here nr. of possible statistic which are waiting for notification
+ * - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+ txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
.maximumSize(10000).build();
}
@Override
public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
- final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+ final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+ final SettableFuture<TransactionId> resultTransId) {
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
new FutureCallback<RpcResult<? extends TransactionAware>>() {
if (id == null) {
LOG.warn("No protocol support");
} else {
+ if (resultTransId != null) {
+ resultTransId.set(id);
+ }
final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
final String cacheKey = buildCacheKey(id, nodeKey.getId());
final TransactionCacheContainer<? super TransactionAware> container =
}
@Override
- public void getAllGroupsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
@Override
new GetAllGroupStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
- .getAllGroupStatistics(builder.build()), null, nodeRef);
+ .getAllGroupStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGroupStat);
+ return result;
}
@Override
- public void getAllMetersStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
@Override
new GetAllMeterStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
- .getAllMeterStatistics(builder.build()), null, nodeRef);
+ .getAllMeterStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllMeterStat);
+ return result;
}
@Override
- public void getAllFlowsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
@Override
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowStatsService
- .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+ .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllFlowStat);
+ return result;
}
@Override
public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
-
- manager.enqueue(new StatDataStoreOperation() {
+ Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ Preconditions.checkArgument(tableId != null, "TableId can not be null!");
+ final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
@Override
- public void applyOperation(final ReadWriteTransaction tx) {
- final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
- @Override
- public Void call() throws Exception {
- final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
- new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
- builder.setNode(nodeRef);
- builder.setTableId(tableId);
-
- final TableBuilder tbuilder = new TableBuilder();
- tbuilder.setId(tableId.getValue());
- tbuilder.setKey(new TableKey(tableId.getValue()));
- registrationRpcFutureCallBack(flowStatsService
- .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
- return null;
- }
- };
- addGetAllStatJob(getAggregateFlowStat);
+ public Void call() throws Exception {
+ final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+ new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+ builder.setNode(nodeRef);
+ builder.setTableId(tableId);
+
+ final TableBuilder tbuilder = new TableBuilder();
+ tbuilder.setId(tableId.getValue());
+ tbuilder.setKey(new TableKey(tableId.getValue()));
+ registrationRpcFutureCallBack(flowStatsService
+ .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
+ return null;
}
- });
+ };
+ addGetAllStatJob(getAggregateFlowStat);
}
@Override
- public void getAllPortsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
@Override
final GetAllNodeConnectorsStatisticsInputBuilder builder =
new GetAllNodeConnectorsStatisticsInputBuilder();
builder.setNode(nodeRef);
- registrationRpcFutureCallBack(portStatsService
- .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+ final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+ portStatsService.getAllNodeConnectorsStatistics(builder.build());
+ registrationRpcFutureCallBack(rpc, null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllPortsStat);
+ return result;
}
@Override
- public void getAllTablesStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
@Override
new GetFlowTablesStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowTableStatsService
- .getFlowTablesStatistics(builder.build()), null, nodeRef);
+ .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllTableStat);
+ return result;
}
@Override
- public void getAllQueueStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
@Override
new GetAllQueuesStatisticsFromAllPortsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(queueStatsService
- .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+ .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllQueueStat);
+ return result;
}
@Override
- public void getAllMeterConfigStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
@Override
new GetAllMeterConfigStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
- .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+ .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(qetAllMeterConfStat);
+ return result;
}
@Override
/* RPC input */
final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
input.setNode(nodeRef);
- registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+ registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
return null;
}
};
/* RPC input */
final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
input.setNode(nodeRef);
- registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+ registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
return null;
}
};
}
@Override
- public void getAllGroupsConfStats(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
@Override
public Void call() throws Exception {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
final GetGroupDescriptionInputBuilder builder =
new GetGroupDescriptionInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
- .getGroupDescription(builder.build()), null, nodeRef);
+ .getGroupDescription(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGropConfStat);
+ return result;
}
public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {