*/
package org.opendaylight.bgpcep.pcep.topology.provider;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
+ private static final VarHandle NEXT_TIMEOUT;
+
+ static {
+ try {
+ NEXT_TIMEOUT = MethodHandles.lookup().findVarHandle(TopologyStatsProvider.class, "nextTimeout",
+ Timeout.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
// This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
// which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
@GuardedBy("this")
private final Map<KeyedInstanceIdentifier<Node, NodeKey>, Reg<?>> statsMap = new HashMap<>();
+ private final ExecutorService executor;
+ private final long updateIntervalNanos;
+ private final DataBroker dataBroker;
+ private final Timer timer;
+
// Note: null indicates we have been shut down
- @GuardedBy("this")
- private DataBroker dataBroker;
+ private volatile Timeout nextTimeout;
@GuardedBy("this")
private TransactionChain transactionChain;
- @GuardedBy("this")
- private final ScheduledFuture<?> scheduleTask;
-
- TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds) {
- this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
- }
- TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds,
- final ScheduledExecutorService scheduler) {
+ TopologyStatsProvider(final DataBroker dataBroker, final Timer timer, final int updateIntervalSeconds) {
this.dataBroker = requireNonNull(dataBroker);
- LOG.info("Initializing TopologyStatsProvider service.");
- scheduleTask = scheduler.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- updateStats();
- }
- }, 0, updateIntervalSeconds, TimeUnit.SECONDS);
+ this.timer = requireNonNull(timer);
+ updateIntervalNanos = TimeUnit.SECONDS.toNanos(updateIntervalSeconds);
+ checkArgument(updateIntervalNanos > 0, "Invalid update interval %s", updateIntervalNanos);
+
+ executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("odl-pcep-stats-%d")
+ .build());
+
+ nextTimeout = timer.newTimeout(this::updateStats, updateIntervalNanos, TimeUnit.NANOSECONDS);
+ LOG.info("TopologyStatsProvider updating every {} seconds", updateIntervalSeconds);
}
// FIXME: there should be no further tasks, hence this should not be needed
// FIXME: if it ends up being needed, it needs to be asynchronous
void shutdown() throws InterruptedException, ExecutionException {
- if (scheduleTask.cancel(true)) {
- LOG.info("Closing TopologyStatsProvider service.");
- lockedShutdown();
- } else {
+ final var local = (Timeout) NEXT_TIMEOUT.getAndSet(null);
+ if (local == null) {
LOG.debug("TopologyStatsProvider already shut down");
+ return;
+ }
+ if (!local.cancel()) {
+ LOG.debug("Failed to cancel timeout");
}
+ lockedShutdown();
}
private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
+ LOG.info("Closing TopologyStatsProvider service.");
+ executor.shutdownNow().forEach(Runnable::run);
+
// Try to get a transaction chain and indicate we are done
final TransactionChain chain = accessChain();
transactionChain = null;
- dataBroker = null;
if (chain == null) {
// Belt & suspenders so we do not error out elsewhere
@Holding("this")
private @Nullable TransactionChain accessChain() {
- if (transactionChain == null && dataBroker != null) {
+ if (nextTimeout == null) {
+ return null;
+ }
+
+ var local = transactionChain;
+ if (local == null) {
// Re-create the chain if we have not been shut down
- transactionChain = dataBroker.createMergingTransactionChain(this);
+ transactionChain = local = dataBroker.createMergingTransactionChain(this);
+ }
+ return local;
+ }
+
+ private void updateStats(final Timeout timeout) {
+ if (timeout.equals(nextTimeout)) {
+ executor.execute(this::updateStats);
+ } else {
+ LOG.debug("Ignoring unexpected timeout {}", timeout);
}
- return transactionChain;
}
@SuppressWarnings("checkstyle:IllegalCatch")
final TransactionChain chain = accessChain();
if (chain == null) {
// Already closed, do not bother
+ LOG.debug("Skipping update on shut down");
return;
}
+ final long now = System.nanoTime();
final WriteTransaction tx = chain.newWriteOnlyTransaction();
try {
for (var entry : statsMap.entrySet()) {
} catch (Exception e) {
LOG.warn("Failed to prepare Tx for PCEP stats update", e);
tx.cancel();
+ schedule(now);
return;
}
@Override
public void onSuccess(final CommitInfo result) {
LOG.debug("Successfully committed Topology stats update");
+ schedule(now);
}
@Override
public void onFailure(final Throwable ex) {
LOG.error("Failed to commit Topology stats update", ex);
+ // Wait a complete cycle
+ schedule(System.nanoTime());
}
}, MoreExecutors.directExecutor());
}
+ private synchronized void schedule(final long lastNow) {
+ if (nextTimeout != null) {
+ lockedSchedule(lastNow);
+ } else {
+ LOG.debug("Skipping schedule on shutdown");
+ }
+ }
+
+ @Holding("this")
+ private void lockedSchedule(final long lastNow) {
+ final long now = System.nanoTime();
+
+ // TODO: can we do something smarter?
+ long delay = lastNow + updateIntervalNanos;
+ while (delay - now < 0) {
+ delay += updateIntervalNanos;
+ }
+ nextTimeout = timer.newTimeout(this::updateStats, lastNow, TimeUnit.NANOSECONDS);
+ }
+
@Override
public synchronized void onTransactionChainFailed(final TransactionChain chain,
final Transaction transaction, final Throwable cause) {
@Override
public synchronized <T extends PcepSessionState> ObjectRegistration<T> bind(
final KeyedInstanceIdentifier<Node, NodeKey> nodeId, final T sessionState) {
- if (dataBroker == null) {
+ if (nextTimeout == null) {
LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
return NoOpObjectRegistration.of(sessionState);
}