X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2FNodeStatisticsHandler.java;h=db216237d01f7f8fe40523c2c62c6b6ecb88abde;hb=83e1c610eeefba667a19c243fbc1098072a8079d;hp=d62b680e1466c89d88180b51b0d89a97b2120602;hpb=3a827de1a10cd2c88a09eefc0ec518fa00f8dc02;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java index d62b680e14..db216237d0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java @@ -7,8 +7,9 @@ */ package org.opendaylight.controller.md.statistics.manager; -import java.util.Collection; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; @@ -55,9 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * This class handles the lifecycle of per-node statistics. It receives data @@ -68,9 +66,13 @@ import com.google.common.util.concurrent.ListenableFuture; */ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext { private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class); + + private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15); + private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5); private static final int NUMBER_OF_WAIT_CYCLES = 2; - private final MultipartMessageManager msgManager = new MultipartMessageManager(); + private final MultipartMessageManager msgManager; + private final StatisticsRequestScheduler srScheduler; private final InstanceIdentifier targetNodeIdentifier; private final FlowStatsTracker flowStats; private final FlowTableStatsTracker flowTableStats; @@ -83,6 +85,17 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo private final DataProviderService dps; private final NodeRef targetNodeRef; private final NodeKey targetNodeKey; + private final TimerTask task = new TimerTask() { + @Override + public void run() { + try{ + requestPeriodicStatistics(); + cleanStaleStatistics(); + }catch(Exception e){ + logger.warn("Exception occured while sending statistics request : {}",e); + } + } + }; public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey, final OpendaylightFlowStatisticsService flowStatsService, @@ -90,22 +103,25 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo final OpendaylightGroupStatisticsService groupStatsService, final OpendaylightMeterStatisticsService meterStatsService, final OpendaylightPortStatisticsService portStatsService, - final OpendaylightQueueStatisticsService queueStatsService) { + final OpendaylightQueueStatisticsService queueStatsService, + final StatisticsRequestScheduler srScheduler) { this.dps = Preconditions.checkNotNull(dps); this.targetNodeKey = Preconditions.checkNotNull(nodeKey); + this.srScheduler = Preconditions.checkNotNull(srScheduler); this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build(); this.targetNodeRef = new NodeRef(targetNodeIdentifier); - final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); - - flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos); - flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos); - groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos); - groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos); - meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos); - meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos); - nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos); - queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos); + final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); + + msgManager = new MultipartMessageManager(lifetimeNanos); + flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this); + flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats); + groupDescStats = new GroupDescStatsTracker(groupStatsService, this); + groupStats = new GroupStatsTracker(groupStatsService, this); + meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this); + meterStats = new MeterStatsTracker(meterStatsService, this); + nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this); + queueStats = new QueueStatsTracker(queueStatsService, this); } public NodeKey getTargetNodeKey() { @@ -124,55 +140,57 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo @Override public DataModificationTransaction startDataModification() { - return dps.beginTransaction(); + DataModificationTransaction dmt = dps.beginTransaction(); + dmt.registerListener(this.srScheduler); + return dmt; } - public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupDescStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupDescStats.updateStats(list); } } - public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupStats.updateStats(list); } } - public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterConfigStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterConfigStats.updateStats(list); } } - public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterStats.updateStats(list); } } - public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateQueueStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { queueStats.updateStats(list); } } - public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowTableStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowTableStats.updateStats(list); } } - public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateNodeConnectorStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { nodeConnectorStats.updateStats(list); } } - public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) { - final Short tableId = msgManager.isExpectedTableTransaction(transaction, more); + public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) { + final Short tableId = msgManager.isExpectedTableTransaction(transaction); if (tableId != null) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); @@ -193,14 +211,14 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } } - public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowStats.updateStats(list); } } public synchronized void updateGroupFeatures(GroupFeatures notification) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -218,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void updateMeterFeatures(MeterFeatures features) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -236,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void cleanStaleStatistics() { - final DataModificationTransaction trans = dps.beginTransaction(); - final long now = System.nanoTime(); - - flowStats.cleanup(trans, now); - groupDescStats.cleanup(trans, now); - groupStats.cleanup(trans, now); - meterConfigStats.cleanup(trans, now); - meterStats.cleanup(trans, now); - nodeConnectorStats.cleanup(trans, now); - queueStats.cleanup(trans, now); + final DataModificationTransaction trans = this.startDataModification(); + + flowStats.cleanup(trans); + groupDescStats.cleanup(trans); + groupStats.cleanup(trans); + meterConfigStats.cleanup(trans); + meterStats.cleanup(trans); + nodeConnectorStats.cleanup(trans); + queueStats.cleanup(trans); msgManager.cleanStaleTransactionIds(); trans.commit(); @@ -254,27 +271,24 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo public synchronized void requestPeriodicStatistics() { logger.debug("Send requests for statistics collection to node : {}", targetNodeKey); - flowTableStats.request(); + this.srScheduler.addRequestToSchedulerQueue(flowTableStats); - // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest() - // comes back -- we do not have any tables anyway. - final Collection tables = flowTableStats.getTables(); - logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size()); - for (final TableKey key : tables) { - logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey); - flowStats.requestAggregateFlows(key); - } + this.srScheduler.addRequestToSchedulerQueue(flowStats); + + this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats); + + this.srScheduler.addRequestToSchedulerQueue(groupStats); + + this.srScheduler.addRequestToSchedulerQueue(groupDescStats); + + this.srScheduler.addRequestToSchedulerQueue(meterStats); + + this.srScheduler.addRequestToSchedulerQueue(meterConfigStats); - flowStats.requestAllFlowsAllTables(); - nodeConnectorStats.request(); - groupStats.request(); - groupDescStats.request(); - meterStats.request(); - meterConfigStats.request(); - queueStats.request(); + this.srScheduler.addRequestToSchedulerQueue(queueStats); } - public synchronized void start() { + public synchronized void start(final Timer timer) { flowStats.start(dps); groupDescStats.start(dps); groupStats.start(dps); @@ -282,11 +296,16 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo meterStats.start(dps); queueStats.start(dps); + timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS); + + logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS); + requestPeriodicStatistics(); } @Override public synchronized void close() { + task.cancel(); flowStats.close(); groupDescStats.close(); groupStats.close(); @@ -294,38 +313,21 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo meterStats.close(); queueStats.close(); + //Clean up queued statistics request from scheduler queue + srScheduler.removeRequestsFromSchedulerQueue(this.getNodeRef()); + logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); } @Override - public void registerTransaction(final ListenableFuture future) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTransaction(result); - logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send statistics request for node {}", targetNodeKey, t); - } - }); + public void registerTransaction(TransactionId id) { + msgManager.recordExpectedTransaction(id); + logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey); } @Override - public void registerTableTransaction(final ListenableFuture future, final Short id) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTableTransaction(result, id); - logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t); - } - }); + public void registerTableTransaction(final TransactionId id, final Short table) { + msgManager.recordExpectedTableTransaction(id, table); + logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table); } }