Rework statistics updates 62/101962/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 3 Aug 2022 14:06:00 +0000 (16:06 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 5 Aug 2022 15:55:40 +0000 (17:55 +0200)
Rather than using an unrelated transaction chain from
TopologyStatsProvider, call back into the session to perform the update
on the corresponding transaction chain.

This eliminates the problem of conflicting updates during session churn
and allows for future evolution.

JIRA: BGPCEP-1005
Change-Id: I4d70b985d1089266c108798c34a0176826016200
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyTracker.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/SessionStateRegistry.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/SessionStateUpdater.java [moved from pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/session/stats/SessionStateImpl.java with 59% similarity]
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologySessionStats.java [moved from pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/session/stats/TopologySessionStats.java with 94% similarity]
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyStatsProvider.java
pcep/topology/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java
pcep/topology/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologySessionListenerTest.java

index 7afbd42674a462e1f27d07171204a3cb308571f6..81c7836c4631dd52472eeccb0e734640034c7e47 100644 (file)
@@ -32,8 +32,6 @@ import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
-import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -93,7 +91,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
     @GuardedBy("this")
     final Map<PlspId, String> lsps = new HashMap<>();
     @GuardedBy("this")
-    private ObjectRegistration<SessionStateImpl> listenerState;
+    private ObjectRegistration<SessionStateUpdater> listenerState;
 
     // FIXME: clarify lifecycle rules of this map, most notably the interaction of multiple SrpIdNumbers
     @GuardedBy("this")
@@ -181,7 +179,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
 
                 final var storeFuture = state.storeNode(topologyAugment,
                         new Node1Builder().setPathComputationClient(pccBuilder.build()).build());
-                listenerState = stateRegistry.bind(nodeId, new SessionStateImpl(this, psession));
+                listenerState = stateRegistry.bind(new SessionStateUpdater(this, psession, state));
                 LOG.info("Session with {} attached to topology node {}", peerAddress, nodeId);
 
                 storeFuture.addCallback(new FutureCallback<CommitInfo>() {
@@ -703,7 +701,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
         return RpcResultBuilder.<Void>success().buildFuture();
     }
 
-    final synchronized @NonNull SessionStateImpl listenerState() {
+    final synchronized @NonNull SessionStateUpdater listenerState() {
         return verifyNotNull(listenerState).getInstance();
     }
 
index f371d1d40efcf1c13bdd95c93656c75600bb916d..70a164369237c1f58d858e4ed902c7acc150a6be 100644 (file)
@@ -19,7 +19,6 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
@@ -115,7 +114,7 @@ public final class PCEPTopologyTracker
         this.pcepDispatcher = requireNonNull(pcepDispatcher);
         this.instructionSchedulerFactory = requireNonNull(instructionSchedulerFactory);
         this.pceServerProvider = requireNonNull(pceServerProvider);
-        statsProvider = new TopologyStatsProvider(dataBroker, timer, updateIntervalSeconds);
+        statsProvider = new TopologyStatsProvider(timer, updateIntervalSeconds);
         statsRpcs = new TopologyStatsRpcServiceImpl(dataBroker);
         statsReg = rpcProviderRegistry.registerRpcImplementation(PcepTopologyStatsRpcService.class, statsRpcs);
 
@@ -181,15 +180,7 @@ public final class PCEPTopologyTracker
             LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
         }
 
-        try {
-            statsProvider.shutdown();
-        } catch (ExecutionException e) {
-            LOG.warn("Failed to close statistics provider", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for statistics provider shutdown", e);
-            Thread.currentThread().interrupt();
-        }
-
+        statsProvider.shutdown();
         LOG.info("PCEP Topology tracker shut down");
     }
 
index aabd0124ef3cdb5e1f3bee6519c53a790bc7ee40..0aaae2dbd0e329020d04afc96869c3f865e970a2 100644 (file)
@@ -9,24 +9,17 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import edu.umd.cs.findbugs.annotations.CheckReturnValue;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
 /**
- * Topology Node Sessions stats handler. Will store Session stats on DS per each Topology Node registered.
+ * Topology Node Sessions stats handler. It is resposible for dispatching update requests to each registrant.
  */
 interface SessionStateRegistry {
     /**
      * Register session to Session stats Registry handler.
      *
-     * @param nodeId       Identifier of the topology node where it will be stored session stats under DS
-     * @param sessionState containing all Stats Session information
+     * @param updater A {@link SessionStateUpdater}
      */
-    // FIXME: BGPCEP-1105: nodeId is a bit superfluous, lifecycle is driven by AbstractTopologySessionListener
     @CheckReturnValue
-    <T extends PcepSessionState> @NonNull ObjectRegistration<T> bind(
-        @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodeId, @NonNull T sessionState);
+    @NonNull ObjectRegistration<SessionStateUpdater> bind(@NonNull SessionStateUpdater updater);
 }
@@ -5,15 +5,19 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.bgpcep.pcep.topology.provider.session.stats;
+package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FluentFuture;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.protocol.pcep.PCEPSessionState;
 import org.opendaylight.protocol.util.StatisticsUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.Tlvs3;
@@ -23,22 +27,27 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.iet
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.PcepEntityIdStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulCapabilitiesStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulMessagesStatsAugBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPrefBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.MessagesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerCapabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerCapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.reply.time.grouping.ReplyTimeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
 import org.opendaylight.yangtools.yang.common.Uint16;
 import org.opendaylight.yangtools.yang.common.Uint32;
 
-public final class SessionStateImpl implements PcepSessionState {
+/**
+ * Callback for updating session state registered with {@link SessionStateRegistry}.
+ */
+final class SessionStateUpdater {
     private final Stopwatch sessionUpDuration = Stopwatch.createStarted();
     private final TopologySessionStats topologySessionStats;
     private final PCEPSessionState session;
@@ -64,9 +73,13 @@ public final class SessionStateImpl implements PcepSessionState {
     @GuardedBy("this")
     private long sentInitMessageCount;
 
-    public SessionStateImpl(final TopologySessionStats topologySessionStats, final PCEPSessionState session) {
+    private final TopologyNodeState node;
+
+    SessionStateUpdater(final TopologySessionStats topologySessionStats, final PCEPSessionState session,
+            final TopologyNodeState node) {
         this.topologySessionStats = requireNonNull(topologySessionStats);
         this.session = requireNonNull(session);
+        this.node = requireNonNull(node);
 
         final SpeakerEntityId entityId = extractEntityId(session.getLocalOpen());
         localPref = entityId == null ? session.getLocalPref() : new LocalPrefBuilder(session.getLocalPref())
@@ -86,93 +99,88 @@ public final class SessionStateImpl implements PcepSessionState {
         return null;
     }
 
-    public synchronized void processRequestStats(final long durationMillis) {
-        if (minReplyMillis == 0 || durationMillis < minReplyMillis) {
-            minReplyMillis = durationMillis;
-        }
-        if (durationMillis > maxReplyMillis) {
-            maxReplyMillis = durationMillis;
-        }
-
-        requestCount++;
-        totalReplyMillis += durationMillis;
-    }
-
-    public synchronized void updateLastReceivedRptMsg() {
-        receivedRptMessageCount++;
-        receivedRptMessageTime = StatisticsUtil.getCurrentTimestampInSeconds();
-    }
-
-    public synchronized void updateStatefulSentMsg(final Message msg) {
-        if (msg instanceof Pcinitiate) {
-            sentInitMessageCount++;
-        } else if (msg instanceof Pcupd) {
-            sentUpdMessageCount++;
-        }
-    }
-
-    @Override
-    public String getSessionDuration() {
-        return StatisticsUtil.formatElapsedTime(sessionUpDuration.elapsed(TimeUnit.SECONDS));
-    }
+    @NonNull FluentFuture<? extends @NonNull CommitInfo> updateStatistics() {
+        // Lockless
+        final var aug = new PcepTopologyNodeStatsAugBuilder().setPcepSessionState(toPcepSessionState()).build();
 
-    @Override
-    public Boolean getSynchronized() {
-        return topologySessionStats.isSessionSynchronized();
+        // FIXME: locking of this, check with session, etc. lifecycle
+        final var tx = node.getChain().newWriteOnlyTransaction();
+        tx.put(LogicalDatastoreType.OPERATIONAL, node.getNodeId().augmentation(PcepTopologyNodeStatsAug.class), aug);
+        return tx.commit();
     }
 
-    @Override
-    public PeerCapabilities getPeerCapabilities() {
-        return new PeerCapabilitiesBuilder()
-            .addAugmentation(new StatefulCapabilitiesStatsAugBuilder()
-                .setActive(topologySessionStats.isLspUpdateCapability())
-                .setInstantiation(topologySessionStats.isInitiationCapability())
-                .setStateful(topologySessionStats.isStatefulCapability())
-                .build())
-            .build();
+    // FIXME: add a caller
+    @NonNull FluentFuture<? extends @NonNull CommitInfo> removeStatistics() {
+        final var tx = node.getChain().newWriteOnlyTransaction();
+        tx.delete(LogicalDatastoreType.OPERATIONAL, node.getNodeId().augmentation(PcepTopologyNodeStatsAug.class));
+        return tx.commit();
     }
 
-    @Override
-    public Messages getMessages() {
+    @VisibleForTesting
+    @NonNull PcepSessionState toPcepSessionState() {
         // Note: callout to session, do not hold lock
         final Messages sessionMessages = session.getMessages();
 
         synchronized (this) {
             final long averageReply = requestCount == 0 ? 0 : Math.round((double) totalReplyMillis / requestCount);
 
-            return new MessagesBuilder(sessionMessages)
-                .setReplyTime(new ReplyTimeBuilder()
-                    .setAverageTime(Uint32.saturatedOf(averageReply))
-                    .setMaxTime(Uint32.saturatedOf(maxReplyMillis))
-                    .setMinTime(Uint32.saturatedOf(minReplyMillis))
+            return new PcepSessionStateBuilder()
+                .setLocalPref(localPref)
+                .setPeerPref(peerPref)
+                .setSessionDuration(StatisticsUtil.formatElapsedTime(sessionUpDuration.elapsed(TimeUnit.SECONDS)))
+                .setSynchronized(topologySessionStats.isSessionSynchronized())
+                .setDelegatedLspsCount(getDelegatedLspsCount())
+                .setPeerCapabilities(new PeerCapabilitiesBuilder()
+                    .addAugmentation(new StatefulCapabilitiesStatsAugBuilder()
+                        .setActive(topologySessionStats.isLspUpdateCapability())
+                        .setInstantiation(topologySessionStats.isInitiationCapability())
+                        .setStateful(topologySessionStats.isStatefulCapability())
+                        .build())
                     .build())
-                .addAugmentation(new StatefulMessagesStatsAugBuilder()
-                    .setLastReceivedRptMsgTimestamp(Uint32.saturatedOf(receivedRptMessageTime))
-                    .setReceivedRptMsgCount(Uint32.saturatedOf(receivedRptMessageCount))
-                    .setSentInitMsgCount(Uint32.saturatedOf(sentInitMessageCount))
-                    .setSentUpdMsgCount(Uint32.saturatedOf(sentUpdMessageCount))
+                .setMessages(new MessagesBuilder(sessionMessages)
+                    .setReplyTime(new ReplyTimeBuilder()
+                        .setAverageTime(Uint32.saturatedOf(averageReply))
+                        .setMaxTime(Uint32.saturatedOf(maxReplyMillis))
+                        .setMinTime(Uint32.saturatedOf(minReplyMillis))
+                        .build())
+                    .addAugmentation(new StatefulMessagesStatsAugBuilder()
+                        .setLastReceivedRptMsgTimestamp(Uint32.saturatedOf(receivedRptMessageTime))
+                        .setReceivedRptMsgCount(Uint32.saturatedOf(receivedRptMessageCount))
+                        .setSentInitMsgCount(Uint32.saturatedOf(sentInitMessageCount))
+                        .setSentUpdMsgCount(Uint32.saturatedOf(sentUpdMessageCount))
+                        .build())
                     .build())
                 .build();
         }
     }
 
-    @Override
-    public LocalPref getLocalPref() {
-        return localPref;
+    @VisibleForTesting
+    @NonNull Uint16 getDelegatedLspsCount() {
+        return Uint16.saturatedOf(topologySessionStats.getDelegatedLspsCount());
     }
 
-    @Override
-    public PeerPref getPeerPref() {
-        return peerPref;
+    synchronized void processRequestStats(final long durationMillis) {
+        if (minReplyMillis == 0 || durationMillis < minReplyMillis) {
+            minReplyMillis = durationMillis;
+        }
+        if (durationMillis > maxReplyMillis) {
+            maxReplyMillis = durationMillis;
+        }
+
+        requestCount++;
+        totalReplyMillis += durationMillis;
     }
 
-    @Override
-    public Uint16 getDelegatedLspsCount() {
-        return Uint16.saturatedOf(topologySessionStats.getDelegatedLspsCount());
+    synchronized void updateLastReceivedRptMsg() {
+        receivedRptMessageCount++;
+        receivedRptMessageTime = StatisticsUtil.getCurrentTimestampInSeconds();
     }
 
-    @Override
-    public Class<PcepSessionState> implementedInterface() {
-        return PcepSessionState.class;
+    synchronized void updateStatefulSentMsg(final Message msg) {
+        if (msg instanceof Pcinitiate) {
+            sentInitMessageCount++;
+        } else if (msg instanceof Pcupd) {
+            sentUpdMessageCount++;
+        }
     }
 }
index 7ae896ec3c72b445332b9008c6b7beb0949671da..c53ade83c30679ed1fc3879bf25ce09dbcf25407 100644 (file)
@@ -10,82 +10,39 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
+import io.netty.util.TimerTask;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.Transaction;
-import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
-import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionStateBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAug;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.stats.rev181109.PcepTopologyNodeStatsAugBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.NoOpObjectRegistration;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class TopologyStatsProvider implements SessionStateRegistry, TransactionChainListener {
+final class TopologyStatsProvider implements SessionStateRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyStatsProvider.class);
-    private static final VarHandle NEXT_TIMEOUT;
 
-    static {
-        try {
-            NEXT_TIMEOUT = MethodHandles.lookup().findVarHandle(TopologyStatsProvider.class, "nextTimeout",
-                Timeout.class);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            throw new ExceptionInInitializerError(e);
-        }
-    }
-
-    // This tracking looks weird. It essentially tracks when there is a pending delete transaction and skips updates --
-    // which is the okay part. The problem is that if the remove operation fails for some reason, we do not retry
-    // deletion. The other weird thing is that this is concurrent set because of removals only -- additions are always
-    // protected by the lock.
-    //
-    // FIXME: This was introduced to remedy "instance-2" of BGPCEP-901. I think we should change statsMap so that it
-    //        tracks also the intent besides PcepSessionState -- that way we can mark 'we want to remove this' and
-    //        retry in face of failing transactions.
-    private final Set<KeyedInstanceIdentifier<Node, NodeKey>> statsPendingDelete = ConcurrentHashMap.newKeySet();
-    @GuardedBy("this")
-    private final Map<KeyedInstanceIdentifier<Node, NodeKey>, Reg<?>> statsMap = new HashMap<>();
+    private final Set<Task> tasks = ConcurrentHashMap.newKeySet();
     private final ExecutorService executor;
     private final long updateIntervalNanos;
-    private final DataBroker dataBroker;
     private final Timer timer;
 
-    // Note: null indicates we have been shut down
-    private volatile Timeout nextTimeout;
-    @GuardedBy("this")
-    private TransactionChain transactionChain;
-
-    TopologyStatsProvider(final DataBroker dataBroker, final Timer timer, final int updateIntervalSeconds) {
-        this.dataBroker = requireNonNull(dataBroker);
+    TopologyStatsProvider(final Timer timer, final int updateIntervalSeconds) {
         this.timer = requireNonNull(timer);
         updateIntervalNanos = TimeUnit.SECONDS.toNanos(updateIntervalSeconds);
         checkArgument(updateIntervalNanos > 0, "Invalid update interval %s", updateIntervalNanos);
@@ -95,221 +52,145 @@ final class TopologyStatsProvider implements SessionStateRegistry, TransactionCh
             .setNameFormat("odl-pcep-stats-%d")
             .build());
 
-        nextTimeout = timer.newTimeout(this::updateStats, updateIntervalNanos, TimeUnit.NANOSECONDS);
         LOG.info("TopologyStatsProvider updating every {} seconds", updateIntervalSeconds);
     }
 
-    // FIXME: there should be no further tasks, hence this should not be needed
-    // FIXME: if it ends up being needed, it needs to be asynchronous
-    void shutdown() throws InterruptedException, ExecutionException {
-        final var local = (Timeout) NEXT_TIMEOUT.getAndSet(null);
-        if (local == null) {
+    void shutdown() {
+        if (executor.isShutdown()) {
             LOG.debug("TopologyStatsProvider already shut down");
             return;
         }
-        if (!local.cancel()) {
-            LOG.debug("Failed to cancel timeout");
-        }
-        lockedShutdown();
-    }
 
-    private synchronized void lockedShutdown() throws InterruptedException, ExecutionException {
         LOG.info("Closing TopologyStatsProvider service.");
-        executor.shutdownNow().forEach(Runnable::run);
-
-        // Try to get a transaction chain and indicate we are done
-        final TransactionChain chain = accessChain();
-        transactionChain = null;
-
-        if (chain == null) {
-            // Belt & suspenders so we do not error out elsewhere
-            LOG.warn("Cannot acquire transaction chain, skipping cleanup");
-            return;
-        }
-
-        // Issue deletes for all registered stats
-        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
-        for (final KeyedInstanceIdentifier<Node, NodeKey> statId : statsMap.keySet()) {
-            wTx.delete(LogicalDatastoreType.OPERATIONAL, statId);
+        final var toRun = executor.shutdownNow();
+        while (!tasks.isEmpty()) {
+            tasks.forEach(Task::close);
         }
-        statsMap.clear();
-
-        // Fire the transaction commit ...
-        final FluentFuture<?> future = wTx.commit();
-        // ... close the transaction chain ...
-        chain.close();
-        // ... and wait for transaction commit to complete
-        LOG.debug("Awaiting finish of TopologyStatsProvider cleanup");
-        future.get();
+        toRun.forEach(Runnable::run);
     }
 
-    @Holding("this")
-    private @Nullable TransactionChain accessChain() {
-        if (nextTimeout == null) {
-            return null;
-        }
-
-        var local = transactionChain;
-        if (local == null) {
-            // Re-create the chain if we have not been shut down
-            transactionChain = local = dataBroker.createMergingTransactionChain(this);
+    @Override
+    public ObjectRegistration<SessionStateUpdater> bind(final SessionStateUpdater sessionState) {
+        if (executor.isShutdown()) {
+            LOG.debug("Ignoring bind of Pcep Node {}", sessionState);
+            return NoOpObjectRegistration.of(sessionState);
         }
-        return local;
-    }
 
-    private void updateStats(final Timeout timeout) {
-        if (timeout.equals(nextTimeout)) {
-            executor.execute(this::updateStats);
-        } else {
-            LOG.debug("Ignoring unexpected timeout {}", timeout);
-        }
+        final var task = new Task(sessionState);
+        tasks.add(task);
+        return task;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private synchronized void updateStats() {
-        final TransactionChain chain = accessChain();
-        if (chain == null) {
-            // Already closed, do not bother
-            LOG.debug("Skipping update on shut down");
-            return;
-        }
+    private final class Task extends AbstractObjectRegistration<SessionStateUpdater> implements TimerTask {
+        private static final VarHandle STATE;
 
-        final long now = System.nanoTime();
-        final WriteTransaction tx = chain.newWriteOnlyTransaction();
-        try {
-            for (var entry : statsMap.entrySet()) {
-                if (!statsPendingDelete.contains(entry.getKey())) {
-                    final var reg = entry.getValue();
-                    if (reg.notClosed()) {
-                        tx.put(LogicalDatastoreType.OPERATIONAL,
-                            entry.getKey().augmentation(PcepTopologyNodeStatsAug.class),
-                            new PcepTopologyNodeStatsAugBuilder()
-                                .setPcepSessionState(new PcepSessionStateBuilder(reg.getInstance()).build())
-                                .build());
-                    }
-                }
+        static {
+            try {
+                STATE = MethodHandles.lookup().findVarHandle(Task.class, "state", Object.class);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new ExceptionInInitializerError(e);
             }
-        } catch (Exception e) {
-            LOG.warn("Failed to prepare Tx for PCEP stats update", e);
-            tx.cancel();
-            schedule(now);
-            return;
         }
 
-        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
-            @Override
-            public void onSuccess(final CommitInfo result) {
-                LOG.debug("Successfully committed Topology stats update");
-                schedule(now);
-            }
-
-            @Override
-            public void onFailure(final Throwable ex) {
-                LOG.error("Failed to commit Topology stats update", ex);
-                // Wait a complete cycle
-                schedule(System.nanoTime());
-            }
-        }, MoreExecutors.directExecutor());
-    }
+        private volatile Object state;
 
-    private synchronized void schedule(final long lastNow) {
-        if (nextTimeout != null) {
-            lockedSchedule(lastNow);
-        } else {
-            LOG.debug("Skipping schedule on shutdown");
+        Task(final @NonNull SessionStateUpdater instance) {
+            super(instance);
+            state = timer.newTimeout(this, updateIntervalNanos, TimeUnit.NANOSECONDS);
         }
-    }
-
-    @Holding("this")
-    private void lockedSchedule(final long lastNow) {
-        final long now = System.nanoTime();
 
-        // TODO: can we do something smarter?
-        long delay = lastNow + updateIntervalNanos;
-        while (delay - now < 0) {
-            delay += updateIntervalNanos;
-        }
-        nextTimeout = timer.newTimeout(this::updateStats, lastNow, TimeUnit.NANOSECONDS);
-    }
-
-    @Override
-    public synchronized void onTransactionChainFailed(final TransactionChain chain,
-            final Transaction transaction, final Throwable cause) {
-        LOG.error("Transaction chain {} failed for tx {}",
-                chain, transaction != null ? transaction.getIdentifier() : null, cause);
-        chain.close();
-
-        // Do not access the transaction chain again, re-recreated it instead
-        if (chain == transactionChain) {
-            transactionChain = null;
-        }
-    }
+        @Override
+        public void run(final Timeout timeout) {
+            if (notClosed()) {
+                LOG.debug("Task {} is closed, ignoring timeout {}", this, timeout);
+                return;
+            }
 
-    @Override
-    public synchronized void onTransactionChainSuccessful(final TransactionChain chain) {
-        LOG.debug("Transaction chain {} successful.", chain);
-    }
+            final var witness = STATE.compareAndExchange(this, timeout, null);
+            if (witness != timeout) {
+                LOG.debug("Task {} ignoring unexpected timeout {} in state {}", this, timeout, witness);
+                return;
+            }
 
-    @Override
-    public synchronized <T extends PcepSessionState> ObjectRegistration<T> bind(
-            final KeyedInstanceIdentifier<Node, NodeKey> nodeId, final T sessionState) {
-        if (nextTimeout == null) {
-            LOG.debug("Ignoring bind of Pcep Node {}", nodeId);
-            return NoOpObjectRegistration.of(sessionState);
+            final var sw = Stopwatch.createStarted();
+            state = executor.submit(() -> updateStatistics(sw));
         }
 
-        final var ret = new Reg<>(sessionState, nodeId);
-        // FIXME: a replace should never happen, and hence regs are just a Set (which can be concurrent and this method
-        //        does not need synchronization
-        statsMap.put(nodeId, ret);
-        return ret;
-    }
-
-    private synchronized void removeRegistration(final @NonNull Reg<?> reg) {
-        final var nodeId = reg.nodeId;
+        private void updateStatistics(final Stopwatch sw) {
+            LOG.debug("Resumed processing task {} after {}", this, sw);
+            if (isClosed()) {
+                // Already closed
+                return;
+            }
 
-        if (!statsMap.remove(nodeId, reg)) {
-            // Already replaced by a subsequent bind()
-            LOG.debug("Ignoring overridden unbind of Pcep Node {}", nodeId);
-            return;
-        }
+            final var prevState = state;
+            if (prevState instanceof Future<?> execFuture && !execFuture.isDone()) {
+                final var future = getInstance().updateStatistics();
+                LOG.debug("Task {} update submitted in {}", this, sw);
+                state = future;
+                future.addCallback(new FutureCallback<CommitInfo>() {
+                    @Override
+                    public void onSuccess(final CommitInfo result) {
+                        LOG.debug("Task {} update completed in {}", this, sw);
+                        reschedule(future, sw.elapsed(TimeUnit.NANOSECONDS));
+                    }
 
-        final TransactionChain chain = accessChain();
-        if (chain == null) {
-            // Already closed, do not bother
-            LOG.debug("Ignoring unbind of Pcep Node {}", nodeId);
-            return;
+                    @Override
+                    public void onFailure(final Throwable cause) {
+                        LOG.debug("Task {} update failed in {}", this, sw, cause);
+                        reschedule(future, 0);
+                    }
+                }, executor);
+            } else {
+                LOG.debug("Task {} ignoring unexpected update in state {}", this, prevState);
+            }
         }
 
-        statsPendingDelete.add(nodeId);
-        final WriteTransaction wTx = chain.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.OPERATIONAL, nodeId);
-        wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
-            @Override
-            public void onSuccess(final CommitInfo result) {
-                LOG.debug("Successfully removed Pcep Node stats {}.", nodeId.getKey().getNodeId());
-                statsPendingDelete.remove(nodeId);
+        private void reschedule(final Object expectedState, final long elapsedNanos) {
+            if (isClosed()) {
+                // Already closed
+                return;
             }
-
-            @Override
-            public void onFailure(final Throwable ex) {
-                LOG.warn("Failed to remove Pcep Node stats {}.", nodeId.getKey().getNodeId(), ex);
-                statsPendingDelete.remove(nodeId);
+            var witness = STATE.compareAndExchange(this, expectedState, null);
+            if (witness != expectedState) {
+                LOG.debug("Task {} ignoring reschedule in unexpected state {}", this, witness);
+                return;
             }
-        }, MoreExecutors.directExecutor());
-    }
-
-    private final class Reg<T extends PcepSessionState> extends AbstractObjectRegistration<T> {
-        private final @NonNull KeyedInstanceIdentifier<Node, NodeKey> nodeId;
 
-        Reg(final @NonNull T instance, final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
-            super(instance);
-            this.nodeId = requireNonNull(nodeId);
+            long remainingNanos = updateIntervalNanos - elapsedNanos;
+            if (remainingNanos < 0) {
+                remainingNanos = updateIntervalNanos;
+            }
+            state = timer.newTimeout(this, remainingNanos, TimeUnit.NANOSECONDS);
         }
 
         @Override
         protected void removeRegistration() {
-            TopologyStatsProvider.this.removeRegistration(this);
+            tasks.remove(this);
+
+            final var prevState = state;
+            if (prevState instanceof Timeout timeout) {
+                timeout.cancel();
+                STATE.compareAndSet(this, prevState, null);
+            } else if (prevState instanceof Future<?> future) {
+                if (!(future instanceof FluentFuture)) {
+                    future.cancel(false);
+                    STATE.compareAndSet(this, prevState, null);
+                }
+            } else {
+                LOG.warn("Task {} in unexpected state {}", this, prevState);
+            }
+            getInstance().removeStatistics().addCallback(new FutureCallback<CommitInfo>() {
+                @Override
+                public void onSuccess(final CommitInfo result) {
+                    LOG.debug("Task {} removed state", this);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    LOG.warn("Task {} failed to remove state", this, cause);
+                }
+            }, MoreExecutors.directExecutor());
         }
     }
 }
index 6223c9ed9d64092e33c3dec2c758e235246f806c..c89e3149e0ff32335aa26dfdb9fdd7356f4b2873 100644 (file)
@@ -37,7 +37,6 @@ import org.opendaylight.protocol.util.InetSocketAddressUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.graph.rev220720.graph.topology.GraphKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.explicit.route.object.Ero;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.explicit.route.object.EroBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.explicit.route.object.ero.Subobject;
@@ -120,8 +119,8 @@ public abstract class AbstractPCEPSessionTest extends AbstractConcurrentDataBrok
         doReturn(pipeline).when(pipeline).replace(any(ChannelHandler.class), any(String.class),
             any(ChannelHandler.class));
         doReturn(eventLoop).when(clientListener).eventLoop();
-        doAnswer(inv -> NoOpObjectRegistration.of(inv.getArgument(1, PcepSessionState.class)))
-            .when(stateRegistry).bind(any(), any());
+        doAnswer(inv -> NoOpObjectRegistration.of(inv.getArgument(0, SessionStateUpdater.class)))
+            .when(stateRegistry).bind(any());
         doReturn(null).when(eventLoop).schedule(any(Runnable.class), any(long.class), any(TimeUnit.class));
         doReturn(true).when(clientListener).isActive();
         final InetSocketAddress ra = new InetSocketAddress(testAddress, 4189);
index 5cd1d4d3a8f5507c0d8fb4a8eb5b710ad66f6f7c..800560e8b680a0ecea7abbe0f25abfb5f09efab9 100644 (file)
@@ -65,9 +65,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.iet
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Close;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulCapabilitiesStatsAug;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulMessagesStatsAug;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.grouping.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.reply.time.grouping.ReplyTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.endpoints.address.family.Ipv4CaseBuilder;
@@ -118,15 +118,16 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
     @Test
     public void testPCEPTopologySessionListener() throws Exception {
         listener.onSessionUp(session);
-        final PcepSessionState listenerState = listener.listenerState();
-        final LocalPref state = listenerState.getLocalPref();
+        final SessionStateUpdater listenerState = listener.listenerState();
+        final PcepSessionState sessionState = listenerState.toPcepSessionState();
+        final LocalPref state = sessionState.getLocalPref();
         assertNotNull(state);
         assertEquals(DEAD_TIMER, state.getDeadtimer().shortValue());
         assertEquals(KEEP_ALIVE, state.getKeepalive().shortValue());
         assertEquals(Uint16.ZERO, state.getSessionId());
         assertEquals(testAddress, state.getIpAddress());
 
-        final PeerPref peerState = listenerState.getPeerPref();
+        final PeerPref peerState = sessionState.getPeerPref();
         assertEquals(testAddress, peerState.getIpAddress());
         assertEquals(DEAD_TIMER, peerState.getDeadtimer().shortValue());
         assertEquals(KEEP_ALIVE, peerState.getKeepalive().shortValue());
@@ -170,16 +171,16 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         });
 
         // check stats
-        checkEquals(() -> assertEquals(1, listenerState.getDelegatedLspsCount().intValue()));
+        checkEquals(() -> assertEquals(Uint16.ONE, listenerState.getDelegatedLspsCount()));
         checkEquals(() -> assertTrue(listener.isSessionSynchronized()));
-        checkEquals(() -> assertTrue(listenerState.getMessages()
+        checkEquals(() -> assertTrue(listenerState.toPcepSessionState().getMessages()
                 .augmentation(StatefulMessagesStatsAug.class).getLastReceivedRptMsgTimestamp().toJava() > 0));
-        checkEquals(() -> assertEquals(2, listenerState.getMessages()
-                .augmentation(StatefulMessagesStatsAug.class).getReceivedRptMsgCount().intValue()));
-        checkEquals(() -> assertEquals(1, listenerState.getMessages()
-                .augmentation(StatefulMessagesStatsAug.class).getSentInitMsgCount().intValue()));
-        checkEquals(() -> assertEquals(0, listenerState.getMessages()
-                .augmentation(StatefulMessagesStatsAug.class).getSentUpdMsgCount().intValue()));
+        checkEquals(() -> assertEquals(Uint32.TWO, listenerState.toPcepSessionState().getMessages()
+                .augmentation(StatefulMessagesStatsAug.class).getReceivedRptMsgCount()));
+        checkEquals(() -> assertEquals(Uint32.ONE, listenerState.toPcepSessionState().getMessages()
+                .augmentation(StatefulMessagesStatsAug.class).getSentInitMsgCount()));
+        checkEquals(() -> assertEquals(Uint32.ZERO, listenerState.toPcepSessionState().getMessages()
+                .augmentation(StatefulMessagesStatsAug.class).getSentUpdMsgCount()));
 
         // update-lsp
         final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.update.lsp.args
@@ -214,18 +215,18 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
             final Path path = reportedLsp.getPath().values().iterator().next();
             assertEquals(2, path.getEro().getSubobject().size());
             assertEquals(dstIpPrefix, getLastEroIpPrefix(path.getEro()));
-            assertEquals(1, listenerState.getDelegatedLspsCount().intValue());
+            assertEquals(Uint16.ONE, listenerState.getDelegatedLspsCount());
             assertTrue(listener.isSessionSynchronized());
-            final StatefulMessagesStatsAug statefulstate = listenerState.getMessages()
+            final StatefulMessagesStatsAug statefulstate = listenerState.toPcepSessionState().getMessages()
                     .augmentation(StatefulMessagesStatsAug.class);
             assertTrue(statefulstate.getLastReceivedRptMsgTimestamp().toJava() > 0);
             assertEquals(3, statefulstate.getReceivedRptMsgCount().intValue());
             assertEquals(1, statefulstate.getSentInitMsgCount().intValue());
             assertEquals(1, statefulstate.getSentUpdMsgCount().intValue());
-            final ReplyTime replyTime = listenerState.getMessages().getReplyTime();
+            final ReplyTime replyTime = listenerState.toPcepSessionState().getMessages().getReplyTime();
             assertTrue(replyTime.getAverageTime().toJava() > 0);
             assertTrue(replyTime.getMaxTime().toJava() > 0);
-            final StatefulCapabilitiesStatsAug statefulCapabilities = listenerState
+            final StatefulCapabilitiesStatsAug statefulCapabilities = listenerState.toPcepSessionState()
                     .getPeerCapabilities().augmentation(StatefulCapabilitiesStatsAug.class);
             assertFalse(statefulCapabilities.getActive());
             assertTrue(statefulCapabilities.getInstantiation());
@@ -269,13 +270,13 @@ public class PCEPTopologySessionListenerTest extends AbstractPCEPSessionTest {
         // check stats
         checkEquals(() -> assertEquals(0, listenerState.getDelegatedLspsCount().intValue()));
         checkEquals(() -> assertTrue(listener.isSessionSynchronized()));
-        checkEquals(() -> assertTrue(listenerState.getMessages()
+        checkEquals(() -> assertTrue(listenerState.toPcepSessionState().getMessages()
                 .augmentation(StatefulMessagesStatsAug.class).getLastReceivedRptMsgTimestamp().toJava() > 0));
-        checkEquals(() -> assertEquals(4, listenerState.getMessages()
+        checkEquals(() -> assertEquals(4, listenerState.toPcepSessionState().getMessages()
                 .augmentation(StatefulMessagesStatsAug.class).getReceivedRptMsgCount().intValue()));
-        checkEquals(() -> assertEquals(2, listenerState.getMessages()
+        checkEquals(() -> assertEquals(2, listenerState.toPcepSessionState().getMessages()
                 .augmentation(StatefulMessagesStatsAug.class).getSentInitMsgCount().intValue()));
-        checkEquals(() -> assertEquals(1, listenerState.getMessages()
+        checkEquals(() -> assertEquals(1, listenerState.toPcepSessionState().getMessages()
                 .augmentation(StatefulMessagesStatsAug.class).getSentUpdMsgCount().intValue()));
     }