import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* This class handles the lifecycle of per-node statistics. It receives data
*
* @author avishnoi@in.ibm.com
*/
-public final class NodeStatisticsHandler implements AutoCloseable {
+public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
private static final int NUMBER_OF_WAIT_CYCLES = 2;
+ private final MultipartMessageManager msgManager = new MultipartMessageManager();
private final InstanceIdentifier<Node> targetNodeIdentifier;
private final FlowStatsTracker flowStats;
private final FlowTableStatsTracker flowTableStats;
private final NodeRef targetNodeRef;
private final NodeKey targetNodeKey;
- public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey) {
+ public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
+ final OpendaylightFlowStatisticsService flowStatsService,
+ final OpendaylightFlowTableStatisticsService flowTableStatsService,
+ final OpendaylightGroupStatisticsService groupStatsService,
+ final OpendaylightMeterStatisticsService meterStatsService,
+ final OpendaylightPortStatisticsService portStatsService,
+ final OpendaylightQueueStatisticsService queueStatsService) {
this.dps = Preconditions.checkNotNull(dps);
this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
this.targetNodeRef = new NodeRef(targetNodeIdentifier);
final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
- flowStats = new FlowStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- flowTableStats = new FlowTableStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- groupDescStats = new GroupDescStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- groupStats = new GroupStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- meterConfigStats = new MeterConfigStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- meterStats = new MeterStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- nodeConnectorStats = new NodeConnectorStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
- queueStats = new QueueStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+
+ flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
+ flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
+ groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
+ groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
+ meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
+ meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
+ nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
+ queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
}
public NodeKey getTargetNodeKey() {
return targetNodeKey;
}
- public Collection<TableKey> getKnownTables() {
- return flowTableStats.getTables();
- }
-
- public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+ @Override
+ public InstanceIdentifier<Node> getNodeIdentifier() {
return targetNodeIdentifier;
}
- public NodeRef getTargetNodeRef() {
+ @Override
+ public NodeRef getNodeRef() {
return targetNodeRef;
}
- public synchronized void updateGroupDescStats(List<GroupDescStats> list) {
- groupDescStats.updateStats(list);
+ @Override
+ public DataModificationTransaction startDataModification() {
+ return dps.beginTransaction();
}
- public synchronized void updateGroupStats(List<GroupStats> list) {
- groupStats.updateStats(list);
+ public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ groupDescStats.updateStats(list);
+ }
}
- public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
- meterConfigStats.updateStats(list);
+ public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ groupStats.updateStats(list);
+ }
}
- public synchronized void updateMeterStats(List<MeterStats> list) {
- meterStats.updateStats(list);
+ public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ meterConfigStats.updateStats(list);
+ }
}
- public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
- queueStats.updateStats(list);
+ public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ meterStats.updateStats(list);
+ }
}
- public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
- flowTableStats.updateStats(list);
+ public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ queueStats.updateStats(list);
+ }
}
- public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
- nodeConnectorStats.updateStats(list);
+ public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ flowTableStats.updateStats(list);
+ }
+ }
+
+ public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ nodeConnectorStats.updateStats(list);
+ }
}
- public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+ public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
+ final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
if (tableId != null) {
final DataModificationTransaction trans = dps.beginTransaction();
-
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
trans.putOperationalData(tableRef, tableBuilder.build());
- // FIXME: should we be tracking this data?
trans.commit();
}
}
+ public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ flowStats.updateStats(list);
+ }
+ }
+
public synchronized void updateGroupFeatures(GroupFeatures notification) {
final DataModificationTransaction trans = dps.beginTransaction();
trans.commit();
}
- public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
- flowStats.updateStats(list);
- }
-
public synchronized void cleanStaleStatistics() {
final DataModificationTransaction trans = dps.beginTransaction();
final long now = System.nanoTime();
meterStats.cleanup(trans, now);
nodeConnectorStats.cleanup(trans, now);
queueStats.cleanup(trans, now);
+ msgManager.cleanStaleTransactionIds();
trans.commit();
}
+ public synchronized void requestPeriodicStatistics() {
+ logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
+
+ flowTableStats.request();
+
+ // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+ // comes back -- we do not have any tables anyway.
+ final Collection<TableKey> tables = flowTableStats.getTables();
+ logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
+ for (final TableKey key : tables) {
+ logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
+ flowStats.requestAggregateFlows(key);
+ }
+
+ flowStats.requestAllFlowsAllTables();
+ nodeConnectorStats.request();
+ groupStats.request();
+ groupDescStats.request();
+ meterStats.request();
+ meterConfigStats.request();
+ queueStats.request();
+ }
+
+ public synchronized void start() {
+ flowStats.start(dps);
+ groupDescStats.start(dps);
+ groupStats.start(dps);
+ meterConfigStats.start(dps);
+ meterStats.start(dps);
+ queueStats.start(dps);
+
+ requestPeriodicStatistics();
+ }
+
@Override
- public void close() {
- // FIXME: cleanup any resources we hold (registrations, etc.)
+ public synchronized void close() {
+ flowStats.close();
+ groupDescStats.close();
+ groupStats.close();
+ meterConfigStats.close();
+ meterStats.close();
+ queueStats.close();
+
logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
}
+
+ @Override
+ public void registerTransaction(final ListenableFuture<TransactionId> future, final StatsRequestType type) {
+ Futures.addCallback(future, new FutureCallback<TransactionId>() {
+ @Override
+ public void onSuccess(TransactionId result) {
+ msgManager.recordExpectedTransaction(result, type);
+ logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
+ }
+ });
+ }
+
+ @Override
+ public void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
+ Futures.addCallback(future, new FutureCallback<TransactionId>() {
+ @Override
+ public void onSuccess(TransactionId result) {
+ msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id);
+ logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
+ }
+ });
+ }
}