Use a Timer for TopologyStatsProvider 51/101951/9
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 2 Aug 2022 23:10:31 +0000 (01:10 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 5 Aug 2022 11:31:24 +0000 (13:31 +0200)
At the end of the day, we do not need a ScheduleExectorService if we
take advantate of our global Timer.

JIRA: BGPCEP-1011
Change-Id: I5b2d8f0e2e5a50e9c154188dbd51b747dd492ddf
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyTracker.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyStatsProvider.java

index 9dbabf549d5cba3fb4ea44d353a72d01b4583a9e..f371d1d40efcf1c13bdd95c93656c75600bb916d 100644 (file)
@@ -115,7 +115,7 @@ public final class PCEPTopologyTracker
         this.pcepDispatcher = requireNonNull(pcepDispatcher);
         this.instructionSchedulerFactory = requireNonNull(instructionSchedulerFactory);
         this.pceServerProvider = requireNonNull(pceServerProvider);
-        statsProvider = new TopologyStatsProvider(dataBroker, updateIntervalSeconds);
+        statsProvider = new TopologyStatsProvider(dataBroker, timer, updateIntervalSeconds);
         statsRpcs = new TopologyStatsRpcServiceImpl(dataBroker);
         statsReg = rpcProviderRegistry.registerRpcImplementation(PcepTopologyStatsRpcService.class, statsRpcs);
 
index d10b9b9a8692767a9e80008f3e955973416f5506..7ae896ec3c72b445332b9008c6b7beb0949671da 100644 (file)
@@ -7,20 +7,24 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
@@ -48,6 +52,16 @@ import org.slf4j.LoggerFactory;
 
 final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
+    private static final VarHandle NEXT_TIMEOUT;
+
+    static {
+        try {
+            NEXT_TIMEOUT = MethodHandles.lookup().findVarHandle(TopologyStatsProvider.class, "nextTimeout",
+                Timeout.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 
     // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
     // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
@@ -60,46 +74,52 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
     private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
     @GuardedBy("this")
     private final Map<KeyedInstanceIdentifier<Node, NodeKey>, Reg<?>> statsMap = new HashMap<>();
+    private final ExecutorService executor;
+    private final long updateIntervalNanos;
+    private final DataBroker dataBroker;
+    private final Timer timer;
+
     // Note: null indicates we have been shut down
-    @GuardedBy("this")
-    private DataBroker dataBroker;
+    private volatile Timeout nextTimeout;
     @GuardedBy("this")
     private TransactionChain transactionChain;
-    @GuardedBy("this")
-    private final ScheduledFuture<?> scheduleTask;
-
-    TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds) {
-        this(dataBroker, updateIntervalSeconds, Executors.newScheduledThreadPool(1));
-    }
 
-    TopologyStatsProvider(final DataBroker dataBroker, final int updateIntervalSeconds,
-            final ScheduledExecutorService scheduler) {
+    TopologyStatsProvider(final DataBroker dataBroker, final Timer timer, final int updateIntervalSeconds) {
         this.dataBroker = requireNonNull(dataBroker);
-        LOG.info("Initializing TopologyStatsProvider service.");
-        scheduleTask = scheduler.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                updateStats();
-            }
-        }, 0, updateIntervalSeconds, TimeUnit.SECONDS);
+        this.timer = requireNonNull(timer);
+        updateIntervalNanos = TimeUnit.SECONDS.toNanos(updateIntervalSeconds);
+        checkArgument(updateIntervalNanos > 0, "Invalid update interval %s", updateIntervalNanos);
+
+        executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("odl-pcep-stats-%d")
+            .build());
+
+        nextTimeout = timer.newTimeout(this::updateStats, updateIntervalNanos, TimeUnit.NANOSECONDS);
+        LOG.info("TopologyStatsProvider updating every {} seconds", updateIntervalSeconds);
     }
 
     // FIXME: there should be no further tasks, hence this should not be needed
     // FIXME: if it ends up being needed, it needs to be asynchronous
     void shutdown() throws InterruptedException, ExecutionException {
-        if (scheduleTask.cancel(true)) {
-            LOG.info("Closing TopologyStatsProvider service.");
-            lockedShutdown();
-        } else {
+        final var local = (Timeout) NEXT_TIMEOUT.getAndSet(null);
+        if (local == null) {
             LOG.debug("TopologyStatsProvider already shut down");
+            return;
+        }
+        if (!local.cancel()) {
+            LOG.debug("Failed to cancel timeout");
         }
+        lockedShutdown();
     }
 
     private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
+        LOG.info("Closing TopologyStatsProvider service.");
+        executor.shutdownNow().forEach(Runnable::run);
+
         // Try to get a transaction chain and indicate we are done
         final TransactionChain chain = accessChain();
         transactionChain = null;
-        dataBroker = null;
 
         if (chain == null) {
             // Belt & suspenders so we do not error out elsewhere
@@ -125,11 +145,24 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
 
     @Holding("this")
     private @Nullable TransactionChain accessChain() {
-        if (transactionChain == null && dataBroker != null) {
+        if (nextTimeout == null) {
+            return null;
+        }
+
+        var local = transactionChain;
+        if (local == null) {
             // Re-create the chain if we have not been shut down
-            transactionChain = dataBroker.createMergingTransactionChain(this);
+            transactionChain = local = dataBroker.createMergingTransactionChain(this);
+        }
+        return local;
+    }
+
+    private void updateStats(final Timeout timeout) {
+        if (timeout.equals(nextTimeout)) {
+            executor.execute(this::updateStats);
+        } else {
+            LOG.debug("Ignoring unexpected timeout {}", timeout);
         }
-        return transactionChain;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -137,9 +170,11 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
         final TransactionChain chain = accessChain();
         if (chain == null) {
             // Already closed, do not bother
+            LOG.debug("Skipping update on shut down");
             return;
         }
 
+        final long now = System.nanoTime();
         final WriteTransaction tx = chain.newWriteOnlyTransaction();
         try {
             for (var entry : statsMap.entrySet()) {
@@ -157,6 +192,7 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
         } catch (Exception e) {
             LOG.warn("Failed to prepare Tx for PCEP stats update", e);
             tx.cancel();
+            schedule(now);
             return;
         }
 
@@ -164,15 +200,38 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
             @Override
             public void onSuccess(final CommitInfo result) {
                 LOG.debug("Successfully committed Topology stats update");
+                schedule(now);
             }
 
             @Override
             public void onFailure(final Throwable ex) {
                 LOG.error("Failed to commit Topology stats update", ex);
+                // Wait a complete cycle
+                schedule(System.nanoTime());
             }
         }, MoreExecutors.directExecutor());
     }
 
+    private synchronized void schedule(final long lastNow) {
+        if (nextTimeout != null) {
+            lockedSchedule(lastNow);
+        } else {
+            LOG.debug("Skipping schedule on shutdown");
+        }
+    }
+
+    @Holding("this")
+    private void lockedSchedule(final long lastNow) {
+        final long now = System.nanoTime();
+
+        // TODO: can we do something smarter?
+        long delay = lastNow + updateIntervalNanos;
+        while (delay - now < 0) {
+            delay += updateIntervalNanos;
+        }
+        nextTimeout = timer.newTimeout(this::updateStats, lastNow, TimeUnit.NANOSECONDS);
+    }
+
     @Override
     public synchronized void onTransactionChainFailed(final TransactionChain chain,
             final Transaction transaction, final Throwable cause) {
@@ -194,7 +253,7 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
     @Override
     public synchronized <T extends PcepSessionState> ObjectRegistration<T> bind(
             final KeyedInstanceIdentifier<Node, NodeKey> nodeId, final T sessionState) {
-        if (dataBroker == null) {
+        if (nextTimeout == null) {
             LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
             return NoOpObjectRegistration.of(sessionState);
         }