* fix timeout value for statWaiter to notification (30 sec is mistake - 3sec is correct value)
* add check TransactionId for every notification (prevent unexpected notification for collecting next statistics)
* timeout has to clear TransactionId (prevention for notification from slower statistics processes
* patch 3 - revert the log level msg (debuging issue in StatPermCollectorImpl)
- change an expiration calculation for cached RPC results (StatRpcMsgManagerImpl)
- fix conditions for call notifyToCollectNextStat (Meter, Group)
succesfull tested for karaf-compatible
Change-Id: I54d7fe9e5c1a5d265c9378507fce1163691b62e5
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
import java.util.List;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* It is call from collecting allStatistics methods as a future result for
* Operational/DS statistic store call (does not matter in the outcome).
*/
- void collectNextStatistics();
+ void collectNextStatistics(TransactionId xid);
/**
* Method returns true if collector has registered some active nodes
import org.opendaylight.yangtools.yang.common.RpcResult;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.SettableFuture;
/**
* statistics-manager
*
* @param future - result every Device RPC call
*/
- <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(Future<RpcResult<T>> future, D inputObj, NodeRef ref);
+ <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
+ Future<RpcResult<T>> future, D inputObj, NodeRef ref, SettableFuture<TransactionId> resultTransId);
/**
* Method adds Notification which is marked as Multipart to the transaction cash
*
* @param NodeRef nodeRef
*/
- void getAllGroupsStat(NodeRef nodeRef);
+ Future<TransactionId> getAllGroupsStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightGroupStatisticsService.getGroupDescription
*
* @param NodeRef nodeRef
*/
- void getAllGroupsConfStats(NodeRef nodeRef);
+ Future<TransactionId> getAllGroupsConfStats(NodeRef nodeRef);
/**
* Method wraps OpendaylightMeterStatisticsService.getGroupFeatures
*
* @param NodeRef nodeRef
*/
- void getAllMetersStat(NodeRef nodeRef);
+ Future<TransactionId> getAllMetersStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightMeterStatisticsService.getAllMeterConfigStatistics
*
* @param NodeRef nodeRef
*/
- void getAllMeterConfigStat(NodeRef nodeRef);
+ Future<TransactionId> getAllMeterConfigStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightMeterStatisticsService.getMeterFeatures
*
* @param NodeRef nodeRef
*/
- void getAllFlowsStat(NodeRef nodeRef);
+ Future<TransactionId> getAllFlowsStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightFlowStatisticsService.getAggregateFlowStatisticsFromFlowTableForAllFlows
*
* @param NodeRef nodeRef
*/
- void getAllPortsStat(NodeRef nodeRef);
+ Future<TransactionId> getAllPortsStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightFlowTableStatisticsService.getFlowTablesStatistics
*
* @param NodeRef nodeRef
*/
- void getAllTablesStat(NodeRef nodeRef);
+ Future<TransactionId> getAllTablesStat(NodeRef nodeRef);
/**
* Method wraps OpendaylightQueueStatisticsService.getAllQueuesStatisticsFromAllPorts
*
* @param NodeRef nodeRef
*/
- void getAllQueueStat(NodeRef nodeRef);
+ Future<TransactionId> getAllQueueStat(NodeRef nodeRef);
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
*
* @param nodeIdent
*/
- void collectNextStatistics(InstanceIdentifier<Node> nodeIdent);
+ void collectNextStatistics(InstanceIdentifier<Node> nodeIdent, TransactionId xid);
/**
* Method wraps {@link StatPermCollector}.connectedNodeRegistration to provide
return manager.isProvidedFlowNodeActive(nodeIdent);
}
- protected void notifyToCollectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
+ protected void notifyToCollectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
Preconditions.checkNotNull(nodeIdent, "FlowCapableNode ident can not be null!");
- manager.collectNextStatistics(nodeIdent);
+ manager.collectNextStatistics(nodeIdent, xid);
}
/**
}
}
/* Notification for continue collecting statistics */
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
/* Delete all not presented Group Nodes */
deleteAllNotPresentNode(fNodeIdent, tx, Collections.unmodifiableList(existGroupKeys));
/* Notification for continue collecting statistics */
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
}
statGroupCommit(((GroupStatisticsUpdated) notif).getGroupStats(), nodeIdent, tx);
}
- if (notifGroup.isPresent()) {
- notifyToCollectNextStatistics(nodeIdent);
+ if ( ! notifGroup.isPresent()) {
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
}
});
/* Delete all not presented Meter Nodes */
deleteAllNotPresentedNodes(fNodeIdent, tx, Collections.unmodifiableList(existMeterKeys));
/* Notification for continue collecting statistics */
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
}
statMeterCommit(((MeterStatisticsUpdated) notif).getMeterStats(), nodeIdent, tx);
}
- if (notifMeter.isPresent()) {
- notifyToCollectNextStatistics(nodeIdent);
+ if ( ! notifMeter.isPresent()) {
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
}
});
/* Delete all not presented Group Nodes */
deleteAllNotPresentedNodes(nodeIdent, tx, Collections.unmodifiableMap(existQueueKeys));
/* Notification for continue collecting statistics */
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
statPortCommit(portStats, nodeIdent, trans);
/* Notification for continue collecting statistics - Port statistics are still same size
* and they are small - don't need to wait for whole apply operation*/
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
statTableCommit(tableStats, nodeIdent, trans);
/* Notification for continue collecting statistics - Tables statistics are still same size
* and they are small - don't need to wait to whole apply operation */
- notifyToCollectNextStatistics(nodeIdent);
+ notifyToCollectNextStatistics(nodeIdent, transId);
}
});
}
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
- private final static long STAT_COLLECT_TIME_OUT = 30000L;
+ private final static long STAT_COLLECT_TIME_OUT = 3000L;
private final ExecutorService statNetCollectorServ;
private final StatisticsManager manager;
private final Object statCollectorLock = new Object();
private final Object statNodeHolderLock = new Object();
+ private final Object transNotifyLock = new Object();
private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
private volatile boolean wakeMe = false;
private volatile boolean finishing = false;
+ private TransactionId actualTransactionId;
public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
final int maxNodeForCollectors) {
public void close() {
statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
}
if (statNodeHolder.isEmpty()) {
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
return true;
}
@Override
- public void collectNextStatistics() {
- if (wakeMe) {
- synchronized (statCollectorLock) {
- if (wakeMe) {
- LOG.trace("STAT-COLLECTOR is notified to conntinue");
- statCollectorLock.notify();
+ public void collectNextStatistics(final TransactionId xid) {
+ if (checkTransactionId(xid)) {
+ if (wakeMe) {
+ synchronized (statCollectorLock) {
+ if (wakeMe) {
+ LOG.trace("STAT-COLLECTOR is notified to conntinue");
+ statCollectorLock.notify();
+ }
}
}
}
@Override
public void run() {
try {
+ // sleep 5 second before collecting all statistics cycles is important
+ // for loading all Nodes to Operational/DS
Thread.sleep(5000);
}
catch (final InterruptedException e1) {
} catch (final InterruptedException e) {
LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
} finally {
+ setActualTransactionId(null);
wakeMe = false;
}
}
if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
break;
}
- switch (statMarker) {
- case PORT_STATS:
- LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
- waitingForNotification();
- break;
- case QUEUE_STATS:
- LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
- waitingForNotification();
- break;
- case TABLE_STATS:
- LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
- waitingForNotification();
- break;
- case GROUP_STATS:
- LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
- waitingForNotification();
- break;
- case METER_STATS:
- LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
- waitingForNotification();
- break;
- case FLOW_STATS:
- LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
- waitingForNotification();
- LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
- for (short i = 0; i < maxTables; i++) {
- final TableId tableId = new TableId(i);
- manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ try {
+ switch (statMarker) {
+ case PORT_STATS:
+ LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case QUEUE_STATS:
+ LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case TABLE_STATS:
+ LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case GROUP_STATS:
+ LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case METER_STATS:
+ LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case FLOW_STATS:
+ LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+ waitingForNotification();
+ LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+ for (short i = 0; i < maxTables; i++) {
+ final TableId tableId = new TableId(i);
+ manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ }
+ break;
+ default:
+ /* Exception for programmers in implementation cycle */
+ throw new IllegalStateException("Not implemented ASK for " + statMarker);
}
- break;
- default:
- /* Exception for programmers in implementation cycle */
- throw new IllegalStateException("Not implemented ASK for " + statMarker);
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+ continue;
}
}
}
}
return true;
}
+
+ private boolean checkTransactionId(final TransactionId xid) {
+ synchronized (transNotifyLock) {
+ return actualTransactionId != null && actualTransactionId.equals(xid);
+ }
+ }
+
+ private void setActualTransactionId(final TransactionId transactionId) {
+ synchronized (transNotifyLock) {
+ actualTransactionId = transactionId;
+ }
+ }
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetMeterFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
private final Cache<String, TransactionCacheContainer<? super TransactionAware>> txCache;
- private final long maxLifeForRequest = 50; /* 50 second */
private final int queueCapacity = 5000;
private final OpendaylightGroupStatisticsService groupStatsService;
private volatile boolean finishing = false;
public StatRpcMsgManagerImpl (final StatisticsManager manager,
- final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+ final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) {
Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
groupStatsService = Preconditions.checkNotNull(
"OpendaylightQueueStatisticsService can not be null!");
statsRpcJobQueue = new LinkedBlockingQueue<>(queueCapacity);
- txCache = CacheBuilder.newBuilder().expireAfterWrite(maxLifeForRequest, TimeUnit.SECONDS)
+ /* nr. 7 is here nr. of possible statistic which are waiting for notification
+ * - check it in StatPermCollectorImpl method collectStatCrossNetwork */
+ txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * 7), TimeUnit.SECONDS)
.maximumSize(10000).build();
}
@Override
public <T extends TransactionAware, D extends DataObject> void registrationRpcFutureCallBack(
- final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef) {
+ final Future<RpcResult<T>> future, final D inputObj, final NodeRef nodeRef,
+ final SettableFuture<TransactionId> resultTransId) {
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),
new FutureCallback<RpcResult<? extends TransactionAware>>() {
if (id == null) {
LOG.warn("No protocol support");
} else {
+ if (resultTransId != null) {
+ resultTransId.set(id);
+ }
final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
final String cacheKey = buildCacheKey(id, nodeKey.getId());
final TransactionCacheContainer<? super TransactionAware> container =
}
@Override
- public void getAllGroupsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllGroupsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() {
@Override
new GetAllGroupStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
- .getAllGroupStatistics(builder.build()), null, nodeRef);
+ .getAllGroupStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGroupStat);
+ return result;
}
@Override
- public void getAllMetersStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllMetersStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() {
@Override
new GetAllMeterStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
- .getAllMeterStatistics(builder.build()), null, nodeRef);
+ .getAllMeterStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllMeterStat);
+ return result;
}
@Override
- public void getAllFlowsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllFlowsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() {
@Override
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowStatsService
- .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef);
+ .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllFlowStat);
+ return result;
}
@Override
tbuilder.setId(tableId.getValue());
tbuilder.setKey(new TableKey(tableId.getValue()));
registrationRpcFutureCallBack(flowStatsService
- .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+ .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null);
return null;
}
};
}
@Override
- public void getAllPortsStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllPortsStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() {
@Override
final GetAllNodeConnectorsStatisticsInputBuilder builder =
new GetAllNodeConnectorsStatisticsInputBuilder();
builder.setNode(nodeRef);
- registrationRpcFutureCallBack(portStatsService
- .getAllNodeConnectorsStatistics(builder.build()), null, nodeRef);
+ final Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> rpc =
+ portStatsService.getAllNodeConnectorsStatistics(builder.build());
+ registrationRpcFutureCallBack(rpc, null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllPortsStat);
+ return result;
}
@Override
- public void getAllTablesStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllTablesStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllTableStat = new RpcJobsQueue() {
@Override
new GetFlowTablesStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(flowTableStatsService
- .getFlowTablesStatistics(builder.build()), null, nodeRef);
+ .getFlowTablesStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllTableStat);
+ return result;
}
@Override
- public void getAllQueueStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllQueueStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() {
@Override
new GetAllQueuesStatisticsFromAllPortsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(queueStatsService
- .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef);
+ .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllQueueStat);
+ return result;
}
@Override
- public void getAllMeterConfigStat(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllMeterConfigStat(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() {
@Override
new GetAllMeterConfigStatisticsInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(meterStatsService
- .getAllMeterConfigStatistics(builder.build()), null, nodeRef);
+ .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(qetAllMeterConfStat);
+ return result;
}
@Override
/* RPC input */
final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder();
input.setNode(nodeRef);
- registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef);
+ registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null);
return null;
}
};
/* RPC input */
final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder();
input.setNode(nodeRef);
- registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef);
+ registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null);
return null;
}
};
}
@Override
- public void getAllGroupsConfStats(final NodeRef nodeRef) {
+ public Future<TransactionId> getAllGroupsConfStats(final NodeRef nodeRef) {
Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+ final SettableFuture<TransactionId> result = SettableFuture.create();
final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() {
@Override
new GetGroupDescriptionInputBuilder();
builder.setNode(nodeRef);
registrationRpcFutureCallBack(groupStatsService
- .getGroupDescription(builder.build()), null, nodeRef);
+ .getGroupDescription(builder.build()), null, nodeRef, result);
return null;
}
};
addGetAllStatJob(getAllGropConfStat);
+ return result;
}
public class TransactionCacheContainerImpl<T extends TransactionAware> implements TransactionCacheContainer<T> {
package org.opendaylight.controller.md.statistics.manager.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* statistics-manager
private final StatisticsManagerConfig statManagerConfig;
- public StatisticsManagerImpl (final DataBroker dataBroker, StatisticsManagerConfig statManagerconfig) {
- this.statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
+ public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
+ statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
public void start(final NotificationProviderService notifService,
final RpcConsumerRegistry rpcRegistry) {
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
- rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMinRequestNetMonitorInterval());
+ rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
}
@Override
- public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
+ public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
for (final StatPermCollector collector : statCollectors) {
if (collector.isProvidedFlowNodeActive(nodeIdent)) {
- collector.collectNextStatistics();
+ collector.collectNextStatistics(xid);
}
}
}