import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
abstract class AbstractStatsTracker<I, K> {
private static final Logger logger = LoggerFactory.getLogger(AbstractStatsTracker.class);
- private static final Function<RpcResult<? extends TransactionAware>, TransactionId> FUNCTION =
- new Function<RpcResult<? extends TransactionAware>, TransactionId>() {
+ private final FutureCallback<RpcResult<? extends TransactionAware>> callback =
+ new FutureCallback<RpcResult<? extends TransactionAware>>() {
@Override
- public TransactionId apply(RpcResult<? extends TransactionAware> input) {
- if (!input.isSuccessful()) {
- logger.debug("Statistics request failed: {}", input.getErrors());
- throw new RPCFailedException("Failed to send statistics request", input.getErrors());
+ public void onSuccess(RpcResult<? extends TransactionAware> result) {
+ if (result.isSuccessful()) {
+ final TransactionId id = result.getResult().getTransactionId();
+ if (id == null) {
+ final Throwable t = new UnsupportedOperationException("No protocol support");
+ t.fillInStackTrace();
+ onFailure(t);
+ } else {
+ context.registerTransaction(id);
+ }
+ } else {
+ logger.debug("Statistics request failed: {}", result.getErrors());
+
+ final Throwable t = new RPCFailedException("Failed to send statistics request", result.getErrors());
+ t.fillInStackTrace();
+ onFailure(t);
}
+ }
- return input.getResult().getTransactionId();
+ @Override
+ public void onFailure(Throwable t) {
+ logger.debug("Failed to send statistics request", t);
}
};
}
protected final <T extends TransactionAware> void requestHelper(Future<RpcResult<T>> future) {
- context.registerTransaction(Futures.transform(JdkFutureAdapters.listenInPoolThread(future), FUNCTION));
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future), callback);
}
protected final DataModificationTransaction startTransaction() {
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Interface exposed to AbstractStatsTracker by its parent NodeStatisticsHandler.
* While we could simply exist without this interface, its purpose is to document
InstanceIdentifier<Node> getNodeIdentifier();
NodeRef getNodeRef();
DataModificationTransaction startDataModification();
- void registerTransaction(ListenableFuture<TransactionId> future);
- void registerTableTransaction(ListenableFuture<TransactionId> future, Short tableId);
+ void registerTransaction(TransactionId id);
+ void registerTableTransaction(TransactionId id, Short tableId);
}
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
* @author avishnoi@in.ibm.com
*
*/
-public class MultipartMessageManager {
- private static final int NUMBER_OF_WAIT_CYCLES = 2;
-
+class MultipartMessageManager {
/*
* Map for tx id and type of request, to keep track of all the request sent
* by Statistics Manager. Statistics Manager won't entertain any multipart
* Because flow table statistics multi part response do not contains the table id.
*/
private final Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<>();
+ private final long lifetimeNanos;
+
+ public MultipartMessageManager(long lifetimeNanos) {
+ this.lifetimeNanos = lifetimeNanos;
+ }
private static final class TxIdEntry {
private final TransactionId txId;
}
}
- private static Long getExpiryTime(){
- return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(
- StatisticsProvider.STATS_COLLECTION_MILLIS*NUMBER_OF_WAIT_CYCLES);
+ private Long getExpiryTime() {
+ return System.nanoTime() + lifetimeNanos;
}
public void cleanStaleTransactionIds() {
import java.util.Collection;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* This class handles the lifecycle of per-node statistics. It receives data
*/
public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
+
+ private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
+ private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final int NUMBER_OF_WAIT_CYCLES = 2;
- private final MultipartMessageManager msgManager = new MultipartMessageManager();
+ private final MultipartMessageManager msgManager;
private final InstanceIdentifier<Node> targetNodeIdentifier;
private final FlowStatsTracker flowStats;
private final FlowTableStatsTracker flowTableStats;
private final DataProviderService dps;
private final NodeRef targetNodeRef;
private final NodeKey targetNodeKey;
+ private final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ requestPeriodicStatistics();
+ cleanStaleStatistics();
+ }
+ };
public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
final OpendaylightFlowStatisticsService flowStatsService,
this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
this.targetNodeRef = new NodeRef(targetNodeIdentifier);
- final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+ final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+ msgManager = new MultipartMessageManager(lifetimeNanos);
flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
queueStats.request();
}
- public synchronized void start() {
+ public synchronized void start(final Timer timer) {
flowStats.start(dps);
groupDescStats.start(dps);
groupStats.start(dps);
meterStats.start(dps);
queueStats.start(dps);
+ timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS);
+
+ logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS);
+
requestPeriodicStatistics();
}
@Override
public synchronized void close() {
+ task.cancel();
flowStats.close();
groupDescStats.close();
groupStats.close();
}
@Override
- public void registerTransaction(final ListenableFuture<TransactionId> future) {
- Futures.addCallback(future, new FutureCallback<TransactionId>() {
- @Override
- public void onSuccess(TransactionId result) {
- msgManager.recordExpectedTransaction(result);
- logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
- }
- });
+ public void registerTransaction(TransactionId id) {
+ msgManager.recordExpectedTransaction(id);
+ logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey);
}
@Override
- public void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
- Futures.addCallback(future, new FutureCallback<TransactionId>() {
- @Override
- public void onSuccess(TransactionId result) {
- msgManager.recordExpectedTableTransaction(result, id);
- logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
- }
- });
+ public void registerTableTransaction(final TransactionId id, final Short table) {
+ msgManager.recordExpectedTableTransaction(id, table);
+ logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table);
}
}
import java.util.Collection;
import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
*
*/
public class StatisticsProvider implements AutoCloseable {
- public static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
-
private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
this.flowCapableTrackerRegistration = dps.registerDataChangeListener(fcnId,
new FlowCapableTracker(this, fcnId));
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- // Send stats requests
- for (NodeStatisticsHandler h : handlers.values()) {
- h.requestPeriodicStatistics();
- }
-
- // Perform cleanup
- for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
- nodeStatisticsAger.cleanStaleStatistics();
- }
-
- } catch (RuntimeException e) {
- spLogger.warn("Failed to request statistics", e);
- }
- }
- }, 0, STATS_COLLECTION_MILLIS);
-
- spLogger.debug("Statistics timer task with timer interval : {}ms", STATS_COLLECTION_MILLIS);
spLogger.info("Statistics Provider started.");
}
final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
if (old == null) {
spLogger.debug("Started node handler for {}", key.getId());
- h.start();
+ h.start(timer);
} else {
spLogger.debug("Prevented race on handler for {}", key.getId());
}