1) Introduced statistics request scheduler, to schedule request based on the current transaction load on MD-SAL DataStore.
Each node submit all individual statistics request to schedular for execution
2) Send statistics request if there is no MD-SAL trasaction pending.
It just keep tracks of the MD-SAL trasaction triggered by statistics-manager
3) Removal of stale statistics is now done based on counter rather then time values.
Time based removal will break in case of clustered environment.
4) Code clean up
Change-Id: Ie7522d0c60f2c7051dbfcdf9a6657843ef4da743
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
private static final Logger logger = LoggerFactory.getLogger(AbstractListeningStatsTracker.class);
private ListenerRegistration<?> reg;
- protected AbstractListeningStatsTracker(FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ protected AbstractListeningStatsTracker(FlowCapableContext context) {
+ super(context);
}
protected abstract InstanceIdentifier<?> listenPath();
abstract class AbstractStatsTracker<I, K> {
private static final Logger logger = LoggerFactory.getLogger(AbstractStatsTracker.class);
+
+ private static final int WAIT_FOR_REQUEST_CYCLE = 2;
+
private final FutureCallback<RpcResult<? extends TransactionAware>> callback =
new FutureCallback<RpcResult<? extends TransactionAware>>() {
@Override
private final Map<K, Long> trackedItems = new HashMap<>();
private final FlowCapableContext context;
- private final long lifetimeNanos;
+ private long requestCounter;
- protected AbstractStatsTracker(final FlowCapableContext context, final long lifetimeNanos) {
+ protected AbstractStatsTracker(final FlowCapableContext context) {
this.context = Preconditions.checkNotNull(context);
- this.lifetimeNanos = lifetimeNanos;
+ this.requestCounter = 0;
}
protected final InstanceIdentifierBuilder<Node> getNodeIdentifierBuilder() {
return context.startDataModification();
}
+ public final synchronized void increaseRequestCounter(){
+ this.requestCounter++;
+ }
protected abstract void cleanupSingleStat(DataModificationTransaction trans, K item);
protected abstract K updateSingleStat(DataModificationTransaction trans, I item);
+ public abstract void request();
public final synchronized void updateStats(List<I> list) {
- final Long expiryTime = System.nanoTime() + lifetimeNanos;
+
final DataModificationTransaction trans = startTransaction();
for (final I item : list) {
- trackedItems.put(updateSingleStat(trans, item), expiryTime);
+ trackedItems.put(updateSingleStat(trans, item), requestCounter);
}
trans.commit();
}
- public final synchronized void cleanup(final DataModificationTransaction trans, long now) {
+ /**
+ * Statistics will be cleaned up if not update in last two request cycles.
+ * @param trans
+ */
+ public final synchronized void cleanup(final DataModificationTransaction trans) {
for (Iterator<Entry<K, Long>> it = trackedItems.entrySet().iterator();it.hasNext();){
Entry<K, Long> e = it.next();
- if (now > e.getValue()) {
+ if (requestCounter >= e.getValue()+WAIT_FOR_REQUEST_CYCLE) {
cleanupSingleStat(trans, e.getKey());
it.remove();
}
*/
package org.opendaylight.controller.md.statistics.manager;
+import java.util.Collection;
import java.util.Map.Entry;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
private final OpendaylightFlowStatisticsService flowStatsService;
+ private FlowTableStatsTracker flowTableStats;
private int unaccountedFlowsCounter = 1;
- FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
+ super(context);
this.flowStatsService = flowStatsService;
}
+ FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, FlowTableStatsTracker flowTableStats) {
+ this(flowStatsService, context);
+ this.flowTableStats = flowTableStats;
+ }
@Override
protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
return "Flow";
}
+ @Override
+ public void request() {
+ // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+ // comes back -- we do not have any tables anyway.
+ final Collection<TableKey> tables = flowTableStats.getTables();
+ logger.debug("Node {} supports {} table(s)", this.getNodeRef(), tables.size());
+ for (final TableKey key : tables) {
+ logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), this.getNodeRef());
+ this.requestAggregateFlows(key);
+ }
+
+ this.requestAllFlowsAllTables();
+
+ }
public void requestAllFlowsAllTables() {
if (flowStatsService != null) {
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
private final Set<TableKey> tables = Collections.unmodifiableSet(privateTables);
private final OpendaylightFlowTableStatisticsService flowTableStatsService;
- FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ FlowTableStatsTracker(OpendaylightFlowTableStatisticsService flowTableStatsService, final FlowCapableContext context) {
+ super(context);
this.flowTableStatsService = flowTableStatsService;
}
return item;
}
+ @Override
public void request() {
if (flowTableStatsService != null) {
final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
private static final Logger logger = LoggerFactory.getLogger(GroupDescStatsTracker.class);
private final OpendaylightGroupStatisticsService groupStatsService;
- public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context, final long lifetimeNanos) {
- super(context, lifetimeNanos);
+ public GroupDescStatsTracker(OpendaylightGroupStatisticsService groupStatsService, final FlowCapableContext context) {
+ super(context);
this.groupStatsService = groupStatsService;
}
return "Group Descriptor";
}
+ @Override
public void request() {
if (groupStatsService != null) {
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
private static final Logger logger = LoggerFactory.getLogger(GroupStatsTracker.class);
private final OpendaylightGroupStatisticsService groupStatsService;
- GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ GroupStatsTracker(OpendaylightGroupStatisticsService groupStatsService, FlowCapableContext context) {
+ super(context);
this.groupStatsService = Preconditions.checkNotNull(groupStatsService);
}
return "Group";
}
+ @Override
public void request() {
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
input.setNode(getNodeRef());
private static final Logger logger = LoggerFactory.getLogger(MeterConfigStatsTracker.class);
private final OpendaylightMeterStatisticsService meterStatsService;
- protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ protected MeterConfigStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+ super(context);
this.meterStatsService = meterStatsService;
}
return item;
}
+ @Override
public void request() {
if (meterStatsService != null) {
GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
private static final Logger logger = LoggerFactory.getLogger(MeterStatsTracker.class);
private final OpendaylightMeterStatisticsService meterStatsService;
- MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ MeterStatsTracker(OpendaylightMeterStatisticsService meterStatsService, final FlowCapableContext context) {
+ super(context);
this.meterStatsService = meterStatsService;
}
return item;
}
+ @Override
public void request() {
if (meterStatsService != null) {
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsTracker.class);
private final OpendaylightPortStatisticsService portStatsService;
- NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ NodeConnectorStatsTracker(final OpendaylightPortStatisticsService portStatsService, final FlowCapableContext context) {
+ super(context);
this.portStatsService = portStatsService;
}
return item;
}
+ @Override
public void request() {
if (portStatsService != null) {
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
*/
package org.opendaylight.controller.md.statistics.manager;
-import java.util.Collection;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
private static final int NUMBER_OF_WAIT_CYCLES = 2;
private final MultipartMessageManager msgManager;
+ private final StatisticsRequestScheduler srScheduler;
private final InstanceIdentifier<Node> targetNodeIdentifier;
private final FlowStatsTracker flowStats;
private final FlowTableStatsTracker flowTableStats;
final OpendaylightGroupStatisticsService groupStatsService,
final OpendaylightMeterStatisticsService meterStatsService,
final OpendaylightPortStatisticsService portStatsService,
- final OpendaylightQueueStatisticsService queueStatsService) {
+ final OpendaylightQueueStatisticsService queueStatsService,
+ final StatisticsRequestScheduler srScheduler) {
this.dps = Preconditions.checkNotNull(dps);
this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
+ this.srScheduler = Preconditions.checkNotNull(srScheduler);
this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
this.targetNodeRef = new NodeRef(targetNodeIdentifier);
final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
msgManager = new MultipartMessageManager(lifetimeNanos);
- flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
- flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
- groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
- groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
- meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
- meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
- nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
- queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
+ flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this);
+ flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats);
+ groupDescStats = new GroupDescStatsTracker(groupStatsService, this);
+ groupStats = new GroupStatsTracker(groupStatsService, this);
+ meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this);
+ meterStats = new MeterStatsTracker(meterStatsService, this);
+ nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this);
+ queueStats = new QueueStatsTracker(queueStatsService, this);
}
public NodeKey getTargetNodeKey() {
@Override
public DataModificationTransaction startDataModification() {
- return dps.beginTransaction();
+ DataModificationTransaction dmt = dps.beginTransaction();
+ dmt.registerListener(this.srScheduler);
+ return dmt;
}
public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
final Short tableId = msgManager.isExpectedTableTransaction(transaction);
if (tableId != null) {
- final DataModificationTransaction trans = dps.beginTransaction();
+ final DataModificationTransaction trans = this.startDataModification();
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
}
public synchronized void updateGroupFeatures(GroupFeatures notification) {
- final DataModificationTransaction trans = dps.beginTransaction();
+ final DataModificationTransaction trans = this.startDataModification();
final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(targetNodeKey);
}
public synchronized void updateMeterFeatures(MeterFeatures features) {
- final DataModificationTransaction trans = dps.beginTransaction();
+ final DataModificationTransaction trans = this.startDataModification();
final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(targetNodeKey);
}
public synchronized void cleanStaleStatistics() {
- final DataModificationTransaction trans = dps.beginTransaction();
- final long now = System.nanoTime();
-
- flowStats.cleanup(trans, now);
- groupDescStats.cleanup(trans, now);
- groupStats.cleanup(trans, now);
- meterConfigStats.cleanup(trans, now);
- meterStats.cleanup(trans, now);
- nodeConnectorStats.cleanup(trans, now);
- queueStats.cleanup(trans, now);
+ final DataModificationTransaction trans = this.startDataModification();
+
+ flowStats.cleanup(trans);
+ groupDescStats.cleanup(trans);
+ groupStats.cleanup(trans);
+ meterConfigStats.cleanup(trans);
+ meterStats.cleanup(trans);
+ nodeConnectorStats.cleanup(trans);
+ queueStats.cleanup(trans);
msgManager.cleanStaleTransactionIds();
trans.commit();
public synchronized void requestPeriodicStatistics() {
logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
- flowTableStats.request();
-
- // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
- // comes back -- we do not have any tables anyway.
- final Collection<TableKey> tables = flowTableStats.getTables();
- logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
- for (final TableKey key : tables) {
- logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
- flowStats.requestAggregateFlows(key);
- }
-
- flowStats.requestAllFlowsAllTables();
- nodeConnectorStats.request();
- groupStats.request();
- groupDescStats.request();
- meterStats.request();
- meterConfigStats.request();
- queueStats.request();
+ this.srScheduler.addRequestToSchedulerQueue(flowTableStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(flowStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(groupStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(groupDescStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(meterStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(meterConfigStats);
+
+ this.srScheduler.addRequestToSchedulerQueue(queueStats);
}
-
+
public synchronized void start(final Timer timer) {
flowStats.start(dps);
groupDescStats.start(dps);
private static final Logger logger = LoggerFactory.getLogger(QueueStatsTracker.class);
private final OpendaylightQueueStatisticsService queueStatsService;
- QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context, long lifetimeNanos) {
- super(context, lifetimeNanos);
+ QueueStatsTracker(OpendaylightQueueStatisticsService queueStatsService, final FlowCapableContext context) {
+ super(context);
this.queueStatsService = queueStatsService;
}
return queueEntry;
}
+ @Override
public void request() {
if (queueStatsService != null) {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightQueueStatisticsService queueStatsService;
+
+ private final StatisticsRequestScheduler srScheduler;
public StatisticsProvider(final DataProviderService dataService) {
this.dps = Preconditions.checkNotNull(dataService);
+ this.srScheduler = new StatisticsRequestScheduler();
}
private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
-
+ this.srScheduler.start();
+
// Start receiving notifications
this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key,
flowStatsService, flowTableStatsService, groupStatsService,
- meterStatsService, portStatsService, queueStatsService);
+ meterStatsService, portStatsService, queueStatsService,srScheduler);
final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
if (old == null) {
spLogger.debug("Started node handler for {}", key.getId());
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main responsibility of the class is to check the MD-SAL data store read/write
+ * transaction accumulation level and send statistics request if number of pending
+ * read/write transactions are zero.
+ * @author avishnoi@in.ibm.com
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class StatisticsRequestScheduler implements DataTransactionListener {
+
+ private static final Logger srsLogger = LoggerFactory.getLogger(StatisticsRequestScheduler.class);
+ private final Timer timer = new Timer("request-monitor", true);
+
+ // We need ordered retrieval, and O(1) contains operation
+ private final Map<AbstractStatsTracker,Integer> requestQueue =
+ Collections.synchronizedMap(new LinkedHashMap<AbstractStatsTracker,Integer>());
+
+ private Long PendingTransactions;
+
+ private long lastRequestTime = System.nanoTime();
+
+ private static final long REQUEST_MONITOR_INTERVAL = 1000;
+
+ private final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ long now = System.nanoTime();
+ if(now > lastRequestTime+TimeUnit.MILLISECONDS.toNanos(REQUEST_MONITOR_INTERVAL)){
+ requestStatistics();
+ }
+ }
+ };
+
+ public StatisticsRequestScheduler(){
+ PendingTransactions = (long) 0;
+ }
+
+ public void addRequestToSchedulerQueue(AbstractStatsTracker statsRequest){
+ requestQueue.put(statsRequest, null);
+ }
+
+ public AbstractStatsTracker getNextRequestFromSchedulerQueue(){
+ //Remove first element
+ AbstractStatsTracker stats = null;
+ synchronized(requestQueue){
+ Iterator<Map.Entry<AbstractStatsTracker, Integer>> nodesItr = requestQueue.entrySet().iterator();
+ if(nodesItr.hasNext()){
+ stats = nodesItr.next().getKey();
+ srsLogger.debug("{} chosen up for execution",stats.getNodeRef());
+ nodesItr.remove();
+ return stats;
+ }
+ }
+ return stats;
+ }
+
+ private void requestStatistics(){
+ AbstractStatsTracker stats = this.getNextRequestFromSchedulerQueue();
+ if(stats != null) {
+ stats.request();
+ stats.increaseRequestCounter();
+ }
+ }
+ @Override
+ public void onStatusUpdated(DataModificationTransaction transaction, TransactionStatus status) {
+
+ AbstractStatsTracker stats = null;
+ synchronized(PendingTransactions){
+ switch(status){
+ case SUBMITED:
+ this.PendingTransactions++;
+ break;
+ case COMMITED:
+ case FAILED:
+ this.PendingTransactions--;
+ if(PendingTransactions == 0){
+ lastRequestTime = System.nanoTime();
+ stats = this.getNextRequestFromSchedulerQueue();
+ }
+ srsLogger.debug("Pending MD-SAL transactions : {} & Scheduler queue size : {}",this.PendingTransactions,this.requestQueue.size());
+ break;
+ default:
+ break;
+ }
+ }
+ if(stats != null){
+ stats.request();
+ stats.increaseRequestCounter();
+ }
+ }
+
+ public void start(){
+ timer.schedule(task, 0, REQUEST_MONITOR_INTERVAL);
+ }
+}