package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
+import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupFeaturesInputBuilder;
*/
public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
- private final static Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatRpcMsgManagerImpl.class);
private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
- private final int queueCapacity = 5000;
+ /**
+ * Cache for futures to be returned by
+ * {@link #isExpectedStatistics(TransactionId, NodeId)}.
+ */
+ private final Cache<String, SettableFuture<Boolean>> txFutureCache;
+
+ /**
+ * The number of seconds to wait for transaction container to be put into
+ * {@link #txCache}.
+ */
+ private static final long TXCACHE_WAIT_TIMEOUT = 10L;
+
+ private static final int MAX_CACHE_SIZE = 10000;
+
+ private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!";
+ private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!";
+ private static final String MSG_NODE_REF_NOT_NULL = "NodeRef can not be null!";
+ /**
+ * Number of possible statistic which are waiting for notification
+ * - check it in StatPermCollectorImpl method collectStatCrossNetwork()
+ */
+ private static final long POSSIBLE_STAT_WAIT_FOR_NOTIFICATION = 7;
private final OpendaylightGroupStatisticsService groupStatsService;
private final OpendaylightMeterStatisticsService meterStatsService;
private final OpendaylightFlowTableStatisticsService flowTableStatsService;
private final OpendaylightQueueStatisticsService queueStatsService;
- private BlockingQueue<RpcJobsQueue> statsRpcJobQueue;
-
- private volatile boolean finishing = false;
-
public StatRpcMsgManagerImpl (final StatisticsManager manager,
final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class),
"OpendaylightQueueStatisticsService can not be null!");
- statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
- /* nr. 7 is here nr. of possible statistic which are waiting for notification
- * - check it in StatPermCollectorImpl method collectStatCrossNetwork */
- txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
- .maximumSize(10000).build();
- }
-
- @Override
- public void close() {
- finishing = true;
- statsRpcJobQueue = null;
- }
-
- @Override
- public void run() {
- /* Neverending cyle - wait for finishing */
- while ( ! finishing) {
- try {
- statsRpcJobQueue.take().call();
- }
- catch (final Exception e) {
- LOG.warn("Stat Element RPC executor fail!", e);
- }
- }
- // Drain all rpcCall, making sure any blocked threads are unblocked
- while ( ! statsRpcJobQueue.isEmpty()) {
- statsRpcJobQueue.poll();
- }
- }
-
- private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) {
- final boolean success = statsRpcJobQueue.offer(getAllStatJob);
- if ( ! success) {
- LOG.warn("Put RPC request getAllStat fail! Queue is full.");
- }
- }
-
- private void addStatJob(final RpcJobsQueue getStatJob) {
- final boolean success = statsRpcJobQueue.offer(getStatJob);
- if ( ! success) {
- LOG.debug("Put RPC request for getStat fail! Queue is full.");
- }
+ txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS)
+ .maximumSize(MAX_CACHE_SIZE).build();
+ txFutureCache = CacheBuilder.newBuilder().
+ expireAfterWrite(TXCACHE_WAIT_TIMEOUT, TimeUnit.SECONDS).
+ maximumSize(MAX_CACHE_SIZE).build();
}
@Override
final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
final SettableFuture<TransactionId> resultTransId) {
- Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
- new FutureCallback<RpcResult<? extends TransactionAware>>() {
-
+ class FutureCallbackImpl implements FutureCallback<RpcResult<? extends TransactionAware>> {
@Override
public void onSuccess(final RpcResult<? extends TransactionAware> result) {
final TransactionId id = result.getResult().getTransactionId();
String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
LOG.warn("Node [{}] does not support statistics request type : {}",
nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
+ if (resultTransId != null) {
+ resultTransId.setException(
+ new UnsupportedOperationException());
+ }
} else {
if (resultTransId != null) {
resultTransId.set(id);
final String cacheKey = buildCacheKey(id, nodeKey.getId());
final TransactionCacheContainer<? super TransactionAware> container =
new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
- txCache.put(cacheKey, container);
+ putTransaction(cacheKey, container);
}
}
@Override
public void onFailure(final Throwable t) {
LOG.warn("Response Registration for Statistics RPC call fail!", t);
+ if (resultTransId != null) {
+ if (t instanceof DOMRpcImplementationNotAvailableException) {
+ //If encountered with RPC not availabe exception, retry till
+ // stats manager remove the node from the stats collector pool
+ resultTransId.set(StatPermCollectorImpl.getFakeTxId());
+ } else {
+ resultTransId.setException(t);
+ }
+ }
}
+ }
- });
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallbackImpl());
}
private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
}
+ /**
+ * Put the given statistics transaction container into the cache.
+ *
+ * @param key Key that specifies the given transaction container.
+ * @param container Transaction container.
+ */
+ private synchronized void putTransaction(
+ String key, TransactionCacheContainer<? super TransactionAware> container) {
+ txCache.put(key, container);
+
+ SettableFuture<Boolean> future = txFutureCache.asMap().remove(key);
+ if (future != null) {
+ // Wake up a thread waiting for this transaction container.
+ future.set(true);
+ }
+ }
+
+ /**
+ * Check to see if the specified transaction container is cached in
+ * {@link #txCache}.
+ *
+ * @param key Key that specifies the transaction container.
+ * @return A future that will contain the result.
+ */
+ private synchronized Future<Boolean> isExpectedStatistics(String key) {
+ Future<Boolean> future;
+ TransactionCacheContainer<?> container = txCache.getIfPresent(key);
+ if (container == null) {
+ // Wait for the transaction container to be put into the cache.
+ SettableFuture<Boolean> f = SettableFuture.<Boolean>create();
+ SettableFuture<Boolean> current =
+ txFutureCache.asMap().putIfAbsent(key, f);
+ future = (current == null) ? f : current;
+ } else {
+ future = Futures.immediateFuture(Boolean.TRUE);
+ }
+
+ return future;
+ }
+
@Override
public Future<Optional<TransactionCacheContainer<?>>> getTransactionCacheContainer(
final TransactionId id, final NodeId nodeId) {
- Preconditions.checkArgument(id != null, "TransactionId can not be null!");
- Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
- final String key = buildCacheKey(id, nodeId);
- final SettableFuture<Optional<TransactionCacheContainer<?>>> result = SettableFuture.create();
-
- final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() {
+ Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
+ Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
+
+ String key = buildCacheKey(id, nodeId);
+ Optional<TransactionCacheContainer<?>> resultContainer =
+ Optional.<TransactionCacheContainer<?>> fromNullable(
+ txCache.asMap().remove(key));
+ if (!resultContainer.isPresent()) {
+ LOG.warn("Transaction cache not found: {}", key);
+ }
- @Override
- public Void call() throws Exception {
- final Optional<TransactionCacheContainer<?>> resultContainer =
- Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
- if (resultContainer.isPresent()) {
- txCache.invalidate(key);
- }
- result.set(resultContainer);
- return null;
- }
- };
- addStatJob(getTransactionCacheContainer);
- return result;
+ return Futures.immediateFuture(resultContainer);
}
@Override
public Future<Boolean> isExpectedStatistics(final TransactionId id, final NodeId nodeId) {
- Preconditions.checkArgument(id != null, "TransactionId can not be null!");
- Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
- final String key = buildCacheKey(id, nodeId);
- final SettableFuture<Boolean> checkStatId = SettableFuture.create();
+ Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL);
+ Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
- final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final Optional<TransactionCacheContainer<?>> result =
- Optional.<TransactionCacheContainer<?>> fromNullable(txCache.getIfPresent(key));
- checkStatId.set(Boolean.valueOf(result.isPresent()));
- return null;
- }
- };
- addStatJob(isExpecedStatistics);
- return checkStatId;
+ String key = buildCacheKey(id, nodeId);
+ return isExpectedStatistics(key);
}
@Override
public void addNotification(final TransactionAware notification, final NodeId nodeId) {
Preconditions.checkArgument(notification != null, "TransactionAware can not be null!");
- Preconditions.checkArgument(nodeId != null, "NodeId can not be null!");
-
- final RpcJobsQueue addNotification = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final TransactionId txId = notification.getTransactionId();
- final String key = buildCacheKey(txId, nodeId);
- final TransactionCacheContainer<? super TransactionAware> container = (txCache.getIfPresent(key));
- if (container != null) {
- container.addNotif(notification);
- }
- return null;
- }
- };
- addStatJob(addNotification);
+ Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL);
+
+ TransactionId txId = notification.getTransactionId();
+ String key = buildCacheKey(txId, nodeId);
+ TransactionCacheContainer<? super TransactionAware> container =
+ txCache.getIfPresent(key);
+ if (container != null) {
+ container.addNotif(notification);
+ } else {
+ LOG.warn("Unable to add notification: {}, {}", key,
+ notification.getImplementedInterface());
+ }
}
@Override
public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllGroupStatisticsInputBuilder builder =
- new GetAllGroupStatisticsInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(groupStatsService
- .getAllGroupStatistics(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllGroupStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllGroupStatisticsInputBuilder builder =
+ new GetAllGroupStatisticsInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ groupStatsService.getAllGroupStatistics(builder.build()), null,
+ nodeRef, result);
return result;
}
@Override
public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllMeterStatisticsInputBuilder builder =
- new GetAllMeterStatisticsInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(meterStatsService
- .getAllMeterStatistics(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllMeterStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllMeterStatisticsInputBuilder builder =
+ new GetAllMeterStatisticsInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ meterStatsService.getAllMeterStatistics(builder.build()), null,
+ nodeRef, result);
return result;
}
@Override
public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
- new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(flowStatsService
- .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllFlowStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
+ new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ flowStatsService.getAllFlowsStatisticsFromAllFlowTables(builder.build()),
+ null, nodeRef, result);
return result;
}
@Override
public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
Preconditions.checkArgument(tableId != null, "TableId can not be null!");
- final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
- new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
- builder.setNode(nodeRef);
- builder.setTableId(tableId);
-
- final TableBuilder tbuilder = new TableBuilder();
- tbuilder.setId(tableId.getValue());
- tbuilder.setKey(new TableKey(tableId.getValue()));
- registrationRpcFutureCallBack(flowStatsService
- .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
- return null;
- }
- };
- addGetAllStatJob(getAggregateFlowStat);
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+ new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+ builder.setNode(nodeRef).setTableId(tableId);
+
+ TableBuilder tbuilder = new TableBuilder().
+ setId(tableId.getValue()).
+ setKey(new TableKey(tableId.getValue()));
+ registrationRpcFutureCallBack(
+ flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()),
+ tbuilder.build(), nodeRef, null);
}
@Override
public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllNodeConnectorsStatisticsInputBuilder builder =
- new GetAllNodeConnectorsStatisticsInputBuilder();
- builder.setNode(nodeRef);
- final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
- portStatsService.getAllNodeConnectorsStatistics(builder.build());
- registrationRpcFutureCallBack(rpc, null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllPortsStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllNodeConnectorsStatisticsInputBuilder builder =
+ new GetAllNodeConnectorsStatisticsInputBuilder();
+ builder.setNode(nodeRef);
+ Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+ portStatsService.getAllNodeConnectorsStatistics(builder.build());
+ registrationRpcFutureCallBack(rpc, null, nodeRef, result);
return result;
}
@Override
public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetFlowTablesStatisticsInputBuilder builder =
- new GetFlowTablesStatisticsInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(flowTableStatsService
- .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllTableStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetFlowTablesStatisticsInputBuilder builder =
+ new GetFlowTablesStatisticsInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ flowTableStatsService.getFlowTablesStatistics(builder.build()),
+ null, nodeRef, result);
return result;
}
@Override
public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
- new GetAllQueuesStatisticsFromAllPortsInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(queueStatsService
- .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(getAllQueueStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllQueuesStatisticsFromAllPortsInputBuilder builder =
+ new GetAllQueuesStatisticsFromAllPortsInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ queueStatsService.getAllQueuesStatisticsFromAllPorts(builder.build()),
+ null, nodeRef, result);
return result;
}
@Override
public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetAllMeterConfigStatisticsInputBuilder builder =
- new GetAllMeterConfigStatisticsInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(meterStatsService
- .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
- return null;
- }
- };
- addGetAllStatJob(qetAllMeterConfStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetAllMeterConfigStatisticsInputBuilder builder =
+ new GetAllMeterConfigStatisticsInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ meterStatsService.getAllMeterConfigStatistics(builder.build()),
+ null, nodeRef, result);
return result;
}
@Override
public void getGroupFeaturesStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- /* RPC input */
- final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
- input.setNode(nodeRef);
- registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
- return null;
- }
- };
- addStatJob(getGroupFeaturesStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder().
+ setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ groupStatsService.getGroupFeatures(input.build()), null, nodeRef,
+ null);
}
@Override
public void getMeterFeaturesStat(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- /* RPC input */
- final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
- input.setNode(nodeRef);
- registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
- return null;
- }
- };
- addStatJob(getMeterFeaturesStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder().
+ setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ meterStatsService.getMeterFeatures(input.build()), null, nodeRef,
+ null);
}
@Override
public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
- Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
- final SettableFuture<TransactionId> result = SettableFuture.create();
- final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
-
- @Override
- public Void call() throws Exception {
- final GetGroupDescriptionInputBuilder builder =
- new GetGroupDescriptionInputBuilder();
- builder.setNode(nodeRef);
- registrationRpcFutureCallBack(groupStatsService
- .getGroupDescription(builder.build()), null, nodeRef, result);
-
- return null;
- }
- };
- addGetAllStatJob(getAllGropConfStat);
+ Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL);
+ SettableFuture<TransactionId> result = SettableFuture.create();
+ GetGroupDescriptionInputBuilder builder =
+ new GetGroupDescriptionInputBuilder();
+ builder.setNode(nodeRef);
+ registrationRpcFutureCallBack(
+ groupStatsService.getGroupDescription(builder.build()), null,
+ nodeRef, result);
return result;
}
private final Optional<? extends DataObject> confInput;
public <D extends DataObject> TransactionCacheContainerImpl (final TransactionId id, final D input, final NodeId nodeId) {
- this.id = Preconditions.checkNotNull(id, "TransactionId can not be null!");
+ this.id = Preconditions.checkNotNull(id, MSG_TRANS_ID_NOT_NULL);
notifications = new CopyOnWriteArrayList<T>();
confInput = Optional.fromNullable(input);
nId = nodeId;
}
}
}
-