X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2FNodeStatisticsHandler.java;h=db216237d01f7f8fe40523c2c62c6b6ecb88abde;hb=48814d6a264b8f13e5db1422336d9ef25cb05fa9;hp=691b9c0b15f58a9d6a01d05e7146c8a69251e332;hpb=e41bd0a8c722f09d901dd17b23480ad928c3e784;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java index 691b9c0b15..db216237d0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java @@ -7,11 +7,11 @@ */ package org.opendaylight.controller.md.statistics.manager; -import java.util.Collection; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -56,9 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * This class handles the lifecycle of per-node statistics. It receives data @@ -69,9 +66,13 @@ import com.google.common.util.concurrent.ListenableFuture; */ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext { private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class); + + private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15); + private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5); private static final int NUMBER_OF_WAIT_CYCLES = 2; - private final MultipartMessageManager msgManager = new MultipartMessageManager(); + private final MultipartMessageManager msgManager; + private final StatisticsRequestScheduler srScheduler; private final InstanceIdentifier targetNodeIdentifier; private final FlowStatsTracker flowStats; private final FlowTableStatsTracker flowTableStats; @@ -84,6 +85,17 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo private final DataProviderService dps; private final NodeRef targetNodeRef; private final NodeKey targetNodeKey; + private final TimerTask task = new TimerTask() { + @Override + public void run() { + try{ + requestPeriodicStatistics(); + cleanStaleStatistics(); + }catch(Exception e){ + logger.warn("Exception occured while sending statistics request : {}",e); + } + } + }; public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey, final OpendaylightFlowStatisticsService flowStatsService, @@ -91,22 +103,25 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo final OpendaylightGroupStatisticsService groupStatsService, final OpendaylightMeterStatisticsService meterStatsService, final OpendaylightPortStatisticsService portStatsService, - final OpendaylightQueueStatisticsService queueStatsService) { + final OpendaylightQueueStatisticsService queueStatsService, + final StatisticsRequestScheduler srScheduler) { this.dps = Preconditions.checkNotNull(dps); this.targetNodeKey = Preconditions.checkNotNull(nodeKey); + this.srScheduler = Preconditions.checkNotNull(srScheduler); this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build(); this.targetNodeRef = new NodeRef(targetNodeIdentifier); - final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); - - flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos); - flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos); - groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos); - groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos); - meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos); - meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos); - nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos); - queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos); + final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); + + msgManager = new MultipartMessageManager(lifetimeNanos); + flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this); + flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats); + groupDescStats = new GroupDescStatsTracker(groupStatsService, this); + groupStats = new GroupStatsTracker(groupStatsService, this); + meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this); + meterStats = new MeterStatsTracker(meterStatsService, this); + nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this); + queueStats = new QueueStatsTracker(queueStatsService, this); } public NodeKey getTargetNodeKey() { @@ -125,55 +140,57 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo @Override public DataModificationTransaction startDataModification() { - return dps.beginTransaction(); + DataModificationTransaction dmt = dps.beginTransaction(); + dmt.registerListener(this.srScheduler); + return dmt; } - public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupDescStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupDescStats.updateStats(list); } } - public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupStats.updateStats(list); } } - public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterConfigStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterConfigStats.updateStats(list); } } - public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterStats.updateStats(list); } } - public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateQueueStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { queueStats.updateStats(list); } } - public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowTableStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowTableStats.updateStats(list); } } - public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateNodeConnectorStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { nodeConnectorStats.updateStats(list); } } - public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) { - final Short tableId = msgManager.isExpectedTableTransaction(transaction, more); + public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) { + final Short tableId = msgManager.isExpectedTableTransaction(transaction); if (tableId != null) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); @@ -194,14 +211,14 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } } - public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowStats.updateStats(list); } } public synchronized void updateGroupFeatures(GroupFeatures notification) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -219,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void updateMeterFeatures(MeterFeatures features) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -237,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void cleanStaleStatistics() { - final DataModificationTransaction trans = dps.beginTransaction(); - final long now = System.nanoTime(); - - flowStats.cleanup(trans, now); - groupDescStats.cleanup(trans, now); - groupStats.cleanup(trans, now); - meterConfigStats.cleanup(trans, now); - meterStats.cleanup(trans, now); - nodeConnectorStats.cleanup(trans, now); - queueStats.cleanup(trans, now); + final DataModificationTransaction trans = this.startDataModification(); + + flowStats.cleanup(trans); + groupDescStats.cleanup(trans); + groupStats.cleanup(trans); + meterConfigStats.cleanup(trans); + meterStats.cleanup(trans); + nodeConnectorStats.cleanup(trans); + queueStats.cleanup(trans); msgManager.cleanStaleTransactionIds(); trans.commit(); @@ -255,27 +271,24 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo public synchronized void requestPeriodicStatistics() { logger.debug("Send requests for statistics collection to node : {}", targetNodeKey); - flowTableStats.request(); + this.srScheduler.addRequestToSchedulerQueue(flowTableStats); - // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest() - // comes back -- we do not have any tables anyway. - final Collection tables = flowTableStats.getTables(); - logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size()); - for (final TableKey key : tables) { - logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey); - flowStats.requestAggregateFlows(key); - } + this.srScheduler.addRequestToSchedulerQueue(flowStats); + + this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats); + + this.srScheduler.addRequestToSchedulerQueue(groupStats); + + this.srScheduler.addRequestToSchedulerQueue(groupDescStats); + + this.srScheduler.addRequestToSchedulerQueue(meterStats); + + this.srScheduler.addRequestToSchedulerQueue(meterConfigStats); - flowStats.requestAllFlowsAllTables(); - nodeConnectorStats.request(); - groupStats.request(); - groupDescStats.request(); - meterStats.request(); - meterConfigStats.request(); - queueStats.request(); + this.srScheduler.addRequestToSchedulerQueue(queueStats); } - public synchronized void start() { + public synchronized void start(final Timer timer) { flowStats.start(dps); groupDescStats.start(dps); groupStats.start(dps); @@ -283,11 +296,16 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo meterStats.start(dps); queueStats.start(dps); + timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS); + + logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS); + requestPeriodicStatistics(); } @Override public synchronized void close() { + task.cancel(); flowStats.close(); groupDescStats.close(); groupStats.close(); @@ -295,38 +313,21 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo meterStats.close(); queueStats.close(); + //Clean up queued statistics request from scheduler queue + srScheduler.removeRequestsFromSchedulerQueue(this.getNodeRef()); + logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); } @Override - public void registerTransaction(final ListenableFuture future, final StatsRequestType type) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTransaction(result, type); - logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send statistics request for node {}", targetNodeKey, t); - } - }); + public void registerTransaction(TransactionId id) { + msgManager.recordExpectedTransaction(id); + logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey); } @Override - public void registerTableTransaction(final ListenableFuture future, final Short id) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id); - logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t); - } - }); + public void registerTableTransaction(final TransactionId id, final Short table) { + msgManager.recordExpectedTableTransaction(id, table); + logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table); } }