X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2FNodeStatisticsHandler.java;h=dbcbab982a9aec997e37a2fb09e763bb3f3c5f96;hp=413c01b1bc132a625b454b9c5e9bd0193d849e4b;hb=aae447eb2ce6272e1bfd2517a6493bf2ea40ed7a;hpb=fd3193d2efe85828156e6a670b5c06a0ea485a98 diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java index 413c01b1bc..dbcbab982a 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java @@ -7,18 +7,17 @@ */ package org.opendaylight.controller.md.statistics.manager; -import java.util.Collection; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService; @@ -28,7 +27,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService; @@ -36,7 +34,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; @@ -59,9 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * This class handles the lifecycle of per-node statistics. It receives data @@ -72,9 +66,13 @@ import com.google.common.util.concurrent.ListenableFuture; */ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext { private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class); + + private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15); + private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5); private static final int NUMBER_OF_WAIT_CYCLES = 2; - private final MultipartMessageManager msgManager = new MultipartMessageManager(); + private final MultipartMessageManager msgManager; + private final StatisticsRequestScheduler srScheduler; private final InstanceIdentifier targetNodeIdentifier; private final FlowStatsTracker flowStats; private final FlowTableStatsTracker flowTableStats; @@ -87,6 +85,17 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo private final DataProviderService dps; private final NodeRef targetNodeRef; private final NodeKey targetNodeKey; + private final TimerTask task = new TimerTask() { + @Override + public void run() { + try{ + requestPeriodicStatistics(); + cleanStaleStatistics(); + }catch(Exception e){ + logger.warn("Exception occured while sending statistics request : {}",e); + } + } + }; public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey, final OpendaylightFlowStatisticsService flowStatsService, @@ -94,49 +103,25 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo final OpendaylightGroupStatisticsService groupStatsService, final OpendaylightMeterStatisticsService meterStatsService, final OpendaylightPortStatisticsService portStatsService, - final OpendaylightQueueStatisticsService queueStatsService) { + final OpendaylightQueueStatisticsService queueStatsService, + final StatisticsRequestScheduler srScheduler) { this.dps = Preconditions.checkNotNull(dps); this.targetNodeKey = Preconditions.checkNotNull(nodeKey); + this.srScheduler = Preconditions.checkNotNull(srScheduler); this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build(); this.targetNodeRef = new NodeRef(targetNodeIdentifier); - final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); - - if (flowStatsService != null) { - flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos); - } else { - flowStats = null; - } - if (flowTableStatsService != null) { - flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos); - } else { - flowTableStats = null; - } - - if (groupStatsService != null) { - groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos); - groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos); - } else { - groupDescStats = null; - groupStats = null; - } - if (meterStatsService != null) { - meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos); - meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos); - } else { - meterConfigStats = null; - meterStats = null; - } - if (portStatsService != null) { - nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos); - } else { - nodeConnectorStats = null; - } - if (queueStatsService != null) { - queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos); - } else { - queueStats = null; - } + final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES); + + msgManager = new MultipartMessageManager(lifetimeNanos); + flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this); + flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats); + groupDescStats = new GroupDescStatsTracker(groupStatsService, this); + groupStats = new GroupStatsTracker(groupStatsService, this); + meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this); + meterStats = new MeterStatsTracker(meterStatsService, this); + nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this); + queueStats = new QueueStatsTracker(queueStatsService, this); } public NodeKey getTargetNodeKey() { @@ -155,55 +140,57 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo @Override public DataModificationTransaction startDataModification() { - return dps.beginTransaction(); + DataModificationTransaction dmt = dps.beginTransaction(); + dmt.registerListener(this.srScheduler); + return dmt; } - public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupDescStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupDescStats.updateStats(list); } } - public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateGroupStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { groupStats.updateStats(list); } } - public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterConfigStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterConfigStats.updateStats(list); } } - public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateMeterStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { meterStats.updateStats(list); } } - public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateQueueStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { queueStats.updateStats(list); } } - public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowTableStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowTableStats.updateStats(list); } } - public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateNodeConnectorStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { nodeConnectorStats.updateStats(list); } } - public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) { - final Short tableId = msgManager.isExpectedTableTransaction(transaction, more); + public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) { + final Short tableId = msgManager.isExpectedTableTransaction(transaction); if (tableId != null) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); @@ -224,14 +211,14 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } } - public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List list) { - if (msgManager.isExpectedTransaction(transaction, more)) { + public synchronized void updateFlowStats(TransactionAware transaction, List list) { + if (msgManager.isExpectedTransaction(transaction)) { flowStats.updateStats(list); } } public synchronized void updateGroupFeatures(GroupFeatures notification) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -249,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void updateMeterFeatures(MeterFeatures features) { - final DataModificationTransaction trans = dps.beginTransaction(); + final DataModificationTransaction trans = this.startDataModification(); final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(targetNodeKey); @@ -267,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo } public synchronized void cleanStaleStatistics() { - final DataModificationTransaction trans = dps.beginTransaction(); - final long now = System.nanoTime(); - - flowStats.cleanup(trans, now); - groupDescStats.cleanup(trans, now); - groupStats.cleanup(trans, now); - meterConfigStats.cleanup(trans, now); - meterStats.cleanup(trans, now); - nodeConnectorStats.cleanup(trans, now); - queueStats.cleanup(trans, now); + final DataModificationTransaction trans = this.startDataModification(); + + flowStats.cleanup(trans); + groupDescStats.cleanup(trans); + groupStats.cleanup(trans); + meterConfigStats.cleanup(trans); + meterStats.cleanup(trans); + nodeConnectorStats.cleanup(trans); + queueStats.cleanup(trans); msgManager.cleanStaleTransactionIds(); trans.commit(); @@ -285,114 +271,60 @@ public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableCo public synchronized void requestPeriodicStatistics() { logger.debug("Send requests for statistics collection to node : {}", targetNodeKey); - if (flowTableStats != null){ - registerTransaction(flowTableStats.request(), StatsRequestType.ALL_FLOW); - } - if (flowStats != null){ - // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest() - // comes back -- we do not have any tables anyway. - final Collection tables = flowTableStats.getTables(); - logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size()); - for (final TableKey key : tables) { - logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey); - registerTableTransaction(flowStats.requestAggregateFlows(key), key.getId()); - } - - registerTransaction(flowStats.requestAllFlowsAllTables(), StatsRequestType.ALL_FLOW); - } - - if (nodeConnectorStats != null) { - registerTransaction(nodeConnectorStats.request(), StatsRequestType.ALL_PORT); - } - - if (groupStats != null) { - registerTransaction(groupStats.request(), StatsRequestType.ALL_GROUP); - } - sendGroupDescriptionRequest(); + this.srScheduler.addRequestToSchedulerQueue(flowTableStats); + + this.srScheduler.addRequestToSchedulerQueue(flowStats); + + this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats); + + this.srScheduler.addRequestToSchedulerQueue(groupStats); + + this.srScheduler.addRequestToSchedulerQueue(groupDescStats); + + this.srScheduler.addRequestToSchedulerQueue(meterStats); + + this.srScheduler.addRequestToSchedulerQueue(meterConfigStats); + + this.srScheduler.addRequestToSchedulerQueue(queueStats); + } + + public synchronized void start(final Timer timer) { + flowStats.start(dps); + groupDescStats.start(dps); + groupStats.start(dps); + meterConfigStats.start(dps); + meterStats.start(dps); + queueStats.start(dps); - if (meterStats != null) { - registerTransaction(meterStats.request(), StatsRequestType.ALL_METER); - } - sendMeterConfigStatisticsRequest(); + timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS); - if(queueStats != null) { - registerTransaction(queueStats.request(), StatsRequestType.ALL_QUEUE_STATS); - } - } + logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS); - public synchronized void start() { requestPeriodicStatistics(); } @Override public synchronized void close() { - // FIXME: cleanup any resources we hold (registrations, etc.) - logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); - } - - synchronized void sendFlowStatsFromTableRequest(Flow flow) { - if (flowStats == null) { - logger.debug("No Flow statistics service, not sending a request"); - return; - } - - registerTransaction(flowStats.requestFlow(flow), StatsRequestType.ALL_FLOW); - } - - synchronized void sendGroupDescriptionRequest() { - if (groupStats == null) { - logger.debug("No Group Descriptor statistics service, not sending a request"); - return; - } - - registerTransaction(groupDescStats.request(), StatsRequestType.GROUP_DESC); - } - - synchronized void sendMeterConfigStatisticsRequest() { - if (meterConfigStats == null) { - logger.debug("No Meter Config statistics service, not sending a request"); - return; - } - - registerTransaction(meterConfigStats.request(), StatsRequestType.METER_CONFIG); - } - - synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) { - if (queueStats == null) { - logger.debug("No Queue statistics service, not sending a request"); - return; - } + task.cancel(); + flowStats.close(); + groupDescStats.close(); + groupStats.close(); + meterConfigStats.close(); + meterStats.close(); + queueStats.close(); - registerTransaction(queueStats.request(nodeConnectorId, queueId), StatsRequestType.ALL_QUEUE_STATS); + logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); } - private void registerTransaction(final ListenableFuture future, final StatsRequestType type) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTransaction(result, type); - logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send statistics request for node {}", targetNodeKey, t); - } - }); + @Override + public void registerTransaction(TransactionId id) { + msgManager.recordExpectedTransaction(id); + logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey); } - private void registerTableTransaction(final ListenableFuture future, final Short id) { - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(TransactionId result) { - msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id); - logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t); - } - }); + @Override + public void registerTableTransaction(final TransactionId id, final Short table) { + msgManager.recordExpectedTableTransaction(id, table); + logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table); } }