Rework synchronization of SessionStateImpl 48/95948/5
authorOleksii Mozghovyi <oleksii.mozghovyi@pantheon.tech>
Mon, 26 Apr 2021 22:49:50 +0000 (01:49 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 10 May 2021 10:05:50 +0000 (12:05 +0200)
SessionStateImpl is a simple state tracker, but its lifecycle was
needlessly complex due to it not being inherently tied to its session.

Make sure SessionStateImpl is instantiated only when we have a session,
which makes a number of operations clearer: we no longer need to deal
with partial initialization and need to protect only internal state.

While we are at it, simplify all of that by keeping simple longs
instead of LongAdder -- and use saturatedOf(long) to convert these to
Uint objects, preventing possible runtime exceptions when overflows
occur.

JIRA: BGPCEP-920
Change-Id: I57b33c7dc33fe63500f5ad96a80d39889c6c5a67
Signed-off-by: Oleksii Mozghovyi <oleksii.mozghovyi@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPRequest.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/session/stats/SessionStateImpl.java

index 272205b72697dfed68a2b501f4a0fb5577fbdc0f..8b7c8b3d837e93f51bba811228f527023e1d085c 100644 (file)
@@ -83,6 +83,9 @@ import org.slf4j.LoggerFactory;
  * @param <L> identifier type for LSPs
  */
 public abstract class AbstractTopologySessionListener<S, L> implements TopologySessionListener, TopologySessionStats {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
+
+    static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
     static final MessageHeader MESSAGE_HEADER = new MessageHeader() {
         private final ProtocolVersion version = new ProtocolVersion(Uint8.ONE);
 
@@ -96,12 +99,12 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             return this.version;
         }
     };
-    static final String MISSING_XML_TAG = "Mandatory XML tags are missing.";
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologySessionListener.class);
+
     @GuardedBy("this")
     final Map<L, String> lsps = new HashMap<>();
     @GuardedBy("this")
-    final SessionStateImpl listenerState;
+    SessionStateImpl listenerState;
+
     @GuardedBy("this")
     private final Map<S, PCEPRequest> requests = new HashMap<>();
     @GuardedBy("this")
@@ -120,7 +123,6 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
 
     AbstractTopologySessionListener(final ServerSessionManager serverSessionManager) {
         this.serverSessionManager = requireNonNull(serverSessionManager);
-        this.listenerState = new SessionStateImpl(this);
     }
 
     @Override
@@ -178,7 +180,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
                 }
                 state.storeNode(topologyAugment,
                         new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), this.session);
-                this.listenerState.init(psession);
+
+                this.listenerState = new SessionStateImpl(this, psession);
                 this.serverSessionManager.bind(this.nodeState.getNodeId(), this.listenerState);
                 LOG.info("Session with {} attached to topology node {}", psession.getRemoteAddress(),
                         state.getNodeId());
@@ -240,6 +243,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
                     LOG.error("Session {} cannot be closed.", psession, e);
                 }
                 this.session = null;
+                this.listenerState = null;
                 this.syncOptimization = null;
 
                 // Clear all requests we know about
@@ -350,7 +354,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
 
     final synchronized PCEPRequest removeRequest(final S id) {
         final PCEPRequest ret = this.requests.remove(id);
-        if (ret != null) {
+        if (ret != null && this.listenerState != null) {
             this.listenerState.processRequestStats(ret.getElapsedMillis());
         }
         LOG.trace("Removed request {} object {}", id, ret);
@@ -559,9 +563,9 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
     @SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
     protected abstract boolean onMessage(MessageContext ctx, Message message);
 
+    @Holding("this")
     final String lookupLspName(final L id) {
-        requireNonNull(id, "ID parameter null.");
-        return this.lsps.get(id);
+        return this.lsps.get(requireNonNull(id, "ID parameter null."));
     }
 
     /**
index 3e6860b0a4ca834dce2316144d2ef132a4cbac4d..a48222827ce80514244beb0a16cdaa4d71c2795a 100644 (file)
@@ -98,7 +98,7 @@ final class PCEPRequest {
         final long elapsedNanos = this.stopwatch.elapsed().toNanos();
         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
         if (elapsedMillis == 0 && elapsedNanos > 0) {
-            return  MINIMUM_ELAPSED_TIME;
+            return MINIMUM_ELAPSED_TIME;
         }
         return elapsedMillis;
     }
index cf6b3c661eeb61703932459197be3c82a8216764..824f4c97ad51b0012e11ba558f354af4c4ccc5c7 100644 (file)
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.bgpcep.pcep.server.PathComputation;
 import org.opendaylight.bgpcep.pcep.server.PceServerProvider;
@@ -234,6 +235,7 @@ class PCEPTopologySessionListener extends AbstractTopologySessionListener<SrpIdN
         return srp.getOperationId();
     }
 
+    @Holding("this")
     private void markAllLspAsStale() {
         this.staleLsps.addAll(this.lsps.keySet());
     }
@@ -295,6 +297,7 @@ class PCEPTopologySessionListener extends AbstractTopologySessionListener<SrpIdN
         return true;
     }
 
+    @Holding("this")
     private boolean manageNextReport(final Reports report, final MessageContext ctx) {
         final Lsp lsp = report.getLsp();
         final PlspId plspid = lsp.getPlspId();
index 3e2cc294da045e8caa9f15dfd86e20fcefee2647..76619712e8b734e81275a8776c39853c9f194a52 100644 (file)
@@ -11,7 +11,9 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.protocol.pcep.PCEPSessionState;
 import org.opendaylight.protocol.util.StatisticsUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.sync.optimizations.rev200720.Tlvs3;
@@ -19,9 +21,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.initiated.rev200720.Pcinitiate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.Pcupd;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.PcepEntityIdStatsAugBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulCapabilitiesStatsAug;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulCapabilitiesStatsAugBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulMessagesStatsAug;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stateful.stats.rev181109.StatefulMessagesStatsAugBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
@@ -31,150 +31,144 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.sta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerCapabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerCapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.reply.time.grouping.ReplyTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.reply.time.grouping.ReplyTimeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
 import org.opendaylight.yangtools.yang.common.Uint16;
 import org.opendaylight.yangtools.yang.common.Uint32;
 
 public final class SessionStateImpl implements PcepSessionState {
-    private final LongAdder lastReceivedRptMsgTimestamp = new LongAdder();
-    private final LongAdder receivedRptMsgCount = new LongAdder();
-    private final LongAdder sentUpdMsgCount = new LongAdder();
-    private final LongAdder sentInitMsgCount = new LongAdder();
-    private final Stopwatch sessionUpDuration;
-
-    private final LongAdder minReplyTime = new LongAdder();
-    private final LongAdder maxReplyTime = new LongAdder();
-    private final LongAdder totalTime = new LongAdder();
-    private final LongAdder reqCount = new LongAdder();
+    private final Stopwatch sessionUpDuration = Stopwatch.createStarted();
     private final TopologySessionStats topologySessionStats;
-    private LocalPref localPref;
-    private PeerPref peerPref;
-    private PCEPSessionState pcepSessionState;
-
-    public SessionStateImpl(final TopologySessionStats topologySessionStats) {
-        this.sessionUpDuration = Stopwatch.createUnstarted();
+    private final PCEPSessionState session;
+    private final LocalPref localPref;
+    private final PeerPref peerPref;
+
+    @GuardedBy("this")
+    private long minReplyMillis;
+    @GuardedBy("this")
+    private long maxReplyMillis;
+    @GuardedBy("this")
+    private long totalReplyMillis;
+    @GuardedBy("this")
+    private long requestCount;
+
+    @GuardedBy("this")
+    private long receivedRptMessageCount;
+    @GuardedBy("this")
+    private long receivedRptMessageTime;
+
+    @GuardedBy("this")
+    private long sentUpdMessageCount;
+    @GuardedBy("this")
+    private long sentInitMessageCount;
+
+    public SessionStateImpl(final TopologySessionStats topologySessionStats, final PCEPSessionState session) {
         this.topologySessionStats = requireNonNull(topologySessionStats);
-    }
+        this.session = requireNonNull(session);
 
-    public synchronized void init(final PCEPSessionState session) {
-        requireNonNull(session);
-        this.pcepSessionState = session;
-        final Open localOpen = session.getLocalOpen();
+        final SpeakerEntityId entityId = extractEntityId(session.getLocalOpen());
+        localPref = entityId == null ? session.getLocalPref() : new LocalPrefBuilder(session.getLocalPref())
+            .addAugmentation(new PcepEntityIdStatsAugBuilder(entityId).build())
+            .build();
+        peerPref = session.getPeerPref();
+    }
 
-        if (localOpen.getTlvs() != null && localOpen.getTlvs().augmentation(Tlvs3.class) != null) {
-            final SpeakerEntityId entityId = localOpen.getTlvs().augmentation(Tlvs3.class).getSpeakerEntityId();
-            if (entityId != null) {
-                this.localPref = new LocalPrefBuilder(session.getLocalPref())
-                        .addAugmentation(new PcepEntityIdStatsAugBuilder(entityId).build()).build();
+    private static @Nullable SpeakerEntityId extractEntityId(final @NonNull Open localOpen) {
+        final Tlvs tlvs = localOpen.getTlvs();
+        if (tlvs != null) {
+            final Tlvs3 aug = tlvs.augmentation(Tlvs3.class);
+            if (aug != null) {
+                return aug.getSpeakerEntityId();
             }
-        } else {
-            this.localPref = session.getLocalPref();
         }
-
-        this.peerPref = session.getPeerPref();
-        this.sessionUpDuration.start();
+        return null;
     }
 
-    public synchronized void processRequestStats(final long duration) {
-        if (this.minReplyTime.longValue() == 0) {
-            this.minReplyTime.reset();
-            this.minReplyTime.add(duration);
-        } else if (duration < this.minReplyTime.longValue()) {
-            this.minReplyTime.reset();
-            this.minReplyTime.add(duration);
+    public synchronized void processRequestStats(final long durationMillis) {
+        if (minReplyMillis == 0 || durationMillis < minReplyMillis) {
+            minReplyMillis = durationMillis;
         }
-        if (duration > this.maxReplyTime.longValue()) {
-            this.maxReplyTime.reset();
-            this.maxReplyTime.add(duration);
+        if (durationMillis > maxReplyMillis) {
+            maxReplyMillis = durationMillis;
         }
-        this.totalTime.add(duration);
-        this.reqCount.increment();
+
+        requestCount++;
+        totalReplyMillis += durationMillis;
     }
 
     public synchronized void updateLastReceivedRptMsg() {
-        this.lastReceivedRptMsgTimestamp.reset();
-        this.lastReceivedRptMsgTimestamp.add(StatisticsUtil.getCurrentTimestampInSeconds());
-        this.receivedRptMsgCount.increment();
+        receivedRptMessageCount++;
+        receivedRptMessageTime = StatisticsUtil.getCurrentTimestampInSeconds();
     }
 
     public synchronized void updateStatefulSentMsg(final Message msg) {
         if (msg instanceof Pcinitiate) {
-            this.sentInitMsgCount.increment();
+            sentInitMessageCount++;
         } else if (msg instanceof Pcupd) {
-            this.sentUpdMsgCount.increment();
+            sentUpdMessageCount++;
         }
     }
 
     @Override
-    public synchronized String getSessionDuration() {
-        return StatisticsUtil.formatElapsedTime(this.sessionUpDuration.elapsed(TimeUnit.SECONDS));
+    public String getSessionDuration() {
+        return StatisticsUtil.formatElapsedTime(sessionUpDuration.elapsed(TimeUnit.SECONDS));
     }
 
     @Override
-    public synchronized Boolean getSynchronized() {
-        return this.topologySessionStats.isSessionSynchronized();
+    public Boolean getSynchronized() {
+        return topologySessionStats.isSessionSynchronized();
     }
 
     @Override
-    public synchronized PeerCapabilities getPeerCapabilities() {
+    public PeerCapabilities getPeerCapabilities() {
         return new PeerCapabilitiesBuilder()
-                .addAugmentation(createStatefulCapabilities())
-                .build();
-    }
-
-    private StatefulCapabilitiesStatsAug createStatefulCapabilities() {
-        return new StatefulCapabilitiesStatsAugBuilder()
-                .setActive(this.topologySessionStats.isLspUpdateCapability())
-                .setInstantiation(this.topologySessionStats.isInitiationCapability())
-                .setStateful(this.topologySessionStats.isStatefulCapability())
-                .build();
+            .addAugmentation(new StatefulCapabilitiesStatsAugBuilder()
+                .setActive(topologySessionStats.isLspUpdateCapability())
+                .setInstantiation(topologySessionStats.isInitiationCapability())
+                .setStateful(topologySessionStats.isStatefulCapability())
+                .build())
+            .build();
     }
 
     @Override
     public Messages getMessages() {
-        return new MessagesBuilder(this.pcepSessionState.getMessages())
-                .setReplyTime(setReplyTime())
-                .addAugmentation(createStatefulMessages())
-                .build();
-    }
-
-    private StatefulMessagesStatsAug createStatefulMessages() {
-        return new StatefulMessagesStatsAugBuilder()
-                .setLastReceivedRptMsgTimestamp(Uint32.valueOf(this.lastReceivedRptMsgTimestamp.longValue()))
-                .setReceivedRptMsgCount(Uint32.valueOf(this.receivedRptMsgCount.longValue()))
-                .setSentInitMsgCount(Uint32.valueOf(this.sentInitMsgCount.longValue()))
-                .setSentUpdMsgCount(Uint32.valueOf(this.sentUpdMsgCount.longValue()))
+        // Note: callout to session, do not hold lock
+        final Messages sessionMessages = session.getMessages();
+
+        synchronized (this) {
+            final long averageReply = requestCount == 0 ? 0 : Math.round((double) totalReplyMillis / requestCount);
+
+            return new MessagesBuilder(sessionMessages)
+                .setReplyTime(new ReplyTimeBuilder()
+                    .setAverageTime(Uint32.saturatedOf(averageReply))
+                    .setMaxTime(Uint32.saturatedOf(maxReplyMillis))
+                    .setMinTime(Uint32.saturatedOf(minReplyMillis))
+                    .build())
+                .addAugmentation(new StatefulMessagesStatsAugBuilder()
+                    .setLastReceivedRptMsgTimestamp(Uint32.saturatedOf(receivedRptMessageTime))
+                    .setReceivedRptMsgCount(Uint32.saturatedOf(receivedRptMessageCount))
+                    .setSentInitMsgCount(Uint32.saturatedOf(sentInitMessageCount))
+                    .setSentUpdMsgCount(Uint32.saturatedOf(sentUpdMessageCount))
+                    .build())
                 .build();
-    }
-
-    private synchronized ReplyTime setReplyTime() {
-        long avg = 0;
-        if (this.reqCount.longValue() != 0) {
-            avg = Math.round((double) this.totalTime.longValue() / this.reqCount.longValue());
         }
-        return new ReplyTimeBuilder()
-                .setAverageTime(Uint32.valueOf(avg))
-                .setMaxTime(Uint32.valueOf(this.maxReplyTime.longValue()))
-                .setMinTime(Uint32.valueOf(this.minReplyTime.longValue()))
-                .build();
     }
 
     @Override
     public LocalPref getLocalPref() {
-        return this.localPref;
+        return localPref;
     }
 
     @Override
     public PeerPref getPeerPref() {
-        return this.peerPref;
+        return peerPref;
     }
 
     @Override
     public Uint16 getDelegatedLspsCount() {
-        return Uint16.valueOf(this.topologySessionStats.getDelegatedLspsCount());
+        return Uint16.saturatedOf(topologySessionStats.getDelegatedLspsCount());
     }
 
     @Override