import java.util.Collection;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* This class handles the lifecycle of per-node statistics. It receives data
*/
public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
+
+ private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
+ private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final int NUMBER_OF_WAIT_CYCLES = 2;
- private final MultipartMessageManager msgManager = new MultipartMessageManager();
+ private final MultipartMessageManager msgManager;
private final InstanceIdentifier<Node> targetNodeIdentifier;
private final FlowStatsTracker flowStats;
private final FlowTableStatsTracker flowTableStats;
private final DataProviderService dps;
private final NodeRef targetNodeRef;
private final NodeKey targetNodeKey;
+ private final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ try{
+ requestPeriodicStatistics();
+ cleanStaleStatistics();
+ }catch(Exception e){
+ logger.warn("Exception occured while sending statistics request : {}",e);
+ }
+ }
+ };
public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
final OpendaylightFlowStatisticsService flowStatsService,
this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
this.targetNodeRef = new NodeRef(targetNodeIdentifier);
- final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+ final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+ msgManager = new MultipartMessageManager(lifetimeNanos);
flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
return dps.beginTransaction();
}
- public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
groupDescStats.updateStats(list);
}
}
- public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateGroupStats(TransactionAware transaction, List<GroupStats> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
groupStats.updateStats(list);
}
}
- public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateMeterConfigStats(TransactionAware transaction, List<MeterConfigStats> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
meterConfigStats.updateStats(list);
}
}
- public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateMeterStats(TransactionAware transaction, List<MeterStats> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
meterStats.updateStats(list);
}
}
- public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateQueueStats(TransactionAware transaction, List<QueueIdAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
queueStats.updateStats(list);
}
}
- public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateFlowTableStats(TransactionAware transaction, List<FlowTableAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
flowTableStats.updateStats(list);
}
}
- public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateNodeConnectorStats(TransactionAware transaction, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
nodeConnectorStats.updateStats(list);
}
}
- public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
- final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
+ public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
+ final Short tableId = msgManager.isExpectedTableTransaction(transaction);
if (tableId != null) {
final DataModificationTransaction trans = dps.beginTransaction();
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
}
}
- public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
- if (msgManager.isExpectedTransaction(transaction, more)) {
+ public synchronized void updateFlowStats(TransactionAware transaction, List<FlowAndStatisticsMapList> list) {
+ if (msgManager.isExpectedTransaction(transaction)) {
flowStats.updateStats(list);
}
}
queueStats.request();
}
- public synchronized void start() {
+ public synchronized void start(final Timer timer) {
flowStats.start(dps);
groupDescStats.start(dps);
groupStats.start(dps);
meterStats.start(dps);
queueStats.start(dps);
+ timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS);
+
+ logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS);
+
requestPeriodicStatistics();
}
@Override
public synchronized void close() {
+ task.cancel();
flowStats.close();
groupDescStats.close();
groupStats.close();
}
@Override
- public void registerTransaction(final ListenableFuture<TransactionId> future, final StatsRequestType type) {
- Futures.addCallback(future, new FutureCallback<TransactionId>() {
- @Override
- public void onSuccess(TransactionId result) {
- msgManager.recordExpectedTransaction(result, type);
- logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
- }
- });
+ public void registerTransaction(TransactionId id) {
+ msgManager.recordExpectedTransaction(id);
+ logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey);
}
@Override
- public void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
- Futures.addCallback(future, new FutureCallback<TransactionId>() {
- @Override
- public void onSuccess(TransactionId result) {
- msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id);
- logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
- }
- });
+ public void registerTableTransaction(final TransactionId id, final Short table) {
+ msgManager.recordExpectedTableTransaction(id, table);
+ logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table);
}
}