Use atomic state transitions in PCEPRequest 06/100706/3
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 21 Apr 2022 13:58:08 +0000 (15:58 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 21 Apr 2022 17:17:20 +0000 (19:17 +0200)
PCEPRequest is has only three logical states it can go to and performs
callouts only once it's done. Use a VarHandle and its atomic operations
to perform transitions between these states.

Change-Id: I8b858b129da96a17499ff68ec90a04b83faab490
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPRequest.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologySessionListener.java

index 6190db4f5c347a6a180614146d15b4e8888fa8a6..c88e4587047c98f9c71861c5c68f1d10fbc14b97 100644 (file)
@@ -282,8 +282,8 @@ public abstract class AbstractTopologySessionListener implements TopologySession
 
                 // Clear all requests we know about
                 for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
-                    final PCEPRequest r = e.getValue();
-                    switch (r.getState()) {
+                    // FIXME: exhaustive when we have JDK17+
+                    switch (e.getValue().cancel()) {
                         case DONE:
                             // Done is done, nothing to do
                             LOG.trace("Request {} was done when session went down.", e.getKey());
@@ -292,13 +292,11 @@ public abstract class AbstractTopologySessionListener implements TopologySession
                             // Peer has not acked: results in failure
                             LOG.info("Request {} was incomplete when session went down, failing the instruction",
                                     e.getKey());
-                            r.done(OperationResults.NOACK);
                             break;
                         case UNSENT:
                             // Peer has not been sent to the peer: results in cancellation
                             LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
                                     e.getKey());
-                            r.done(OperationResults.UNSENT);
                             break;
                         default:
                             break;
@@ -421,10 +419,10 @@ public abstract class AbstractTopologySessionListener implements TopologySession
                 synchronized (AbstractTopologySessionListener.this) {
                     requests.remove(requestId);
                 }
-                req.done(OperationResults.UNSENT);
+                req.cancel();
                 LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
             } else {
-                req.sent();
+                req.markUnacked();
                 LOG.trace("Request {} sent to peer (object {})", requestId, req);
             }
         });
@@ -440,7 +438,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
                 synchronized (AbstractTopologySessionListener.this) {
                     requests.remove(requestId);
                 }
-                req.done();
+                req.cancel();
                 LOG.info("Request {} timed-out waiting for response", requestId);
             }
         }, TimeUnit.SECONDS.toMillis(timeout));
@@ -690,7 +688,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
 
         private void notifyRequests() {
             for (final PCEPRequest r : requests) {
-                r.done(OperationResults.SUCCESS);
+                r.finish(OperationResults.SUCCESS);
             }
         }
     }
index a48222827ce80514244beb0a16cdaa4d71c2795a..4e117736cd209e8036f29b7873a9d218d7521a29 100644 (file)
@@ -10,96 +10,136 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
-import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class PCEPRequest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PCEPRequest.class);
-
-    private static final long MINIMUM_ELAPSED_TIME = 1;
-
+    /**
+     * Logical state of a {@link PCEPRequest}.
+     */
     enum State {
+        /**
+         * The request has not been written out to the session.
+         */
         UNSENT,
+        /**
+         * The request has been sent to to the sesssion, but has not been acknowledged by the peer.
+         */
         UNACKED,
+        /**
+         * The request has been completed.
+         */
         DONE,
     }
 
-    private final SettableFuture<OperationResult> future;
+    private static final Logger LOG = LoggerFactory.getLogger(PCEPRequest.class);
+    private static final long MINIMUM_ELAPSED_TIME = 1;
+    private static final VarHandle STATE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(PCEPRequest.class, "state", State.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private final SettableFuture<OperationResult> future = SettableFuture.create();
+    private final Stopwatch stopwatch = Stopwatch.createStarted();
     private final Metadata metadata;
-    private volatile State state;
-    @GuardedBy("this")
-    private final Stopwatch stopwatch;
-    private final Timer timer;
+
+    // Manipulated via STATE
+    @SuppressWarnings("unused")
+    private volatile State state = State.UNSENT;
+
+    // Guarded by state going to State.DONE
+    private final Timer timer = new Timer();
 
     PCEPRequest(final Metadata metadata) {
-        this.future = SettableFuture.create();
         this.metadata = metadata;
-        this.state = State.UNSENT;
-        this.stopwatch = Stopwatch.createStarted();
-        this.timer = new Timer();
     }
 
     protected ListenableFuture<OperationResult> getFuture() {
-        return this.future;
+        return future;
     }
 
     public Metadata getMetadata() {
-        return this.metadata;
+        return metadata;
     }
 
-    public State getState() {
-        return this.state;
+    Timer getTimer() {
+        return timer;
     }
 
-    Timer getTimer() {
-        return this.timer;
+    long getElapsedMillis() {
+        final long elapsedNanos = stopwatch.elapsed().toNanos();
+        final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
+        if (elapsedMillis == 0 && elapsedNanos > 0) {
+            return MINIMUM_ELAPSED_TIME;
+        }
+        return elapsedMillis;
+    }
+
+    /**
+     * Mark this request as {@link State#UNACKED} if it currently is {@link State#UNSENT}.
+     */
+    void markUnacked() {
+        if (STATE.compareAndSet(this, State.UNSENT, State.UNACKED)) {
+            LOG.debug("Request went from {} to {}", State.UNSENT, State.UNACKED);
+        }
     }
 
-    synchronized void done(final OperationResult result) {
-        if (this.state != State.DONE) {
-            LOG.debug("Request went from {} to {}", this.state, State.DONE);
-            this.state = State.DONE;
-            this.timer.cancel();
-            this.future.set(result);
+    /**
+     * Mark this request as {@link State#DONE} with specified {@link OperationResult}. If it is already done, this
+     * method does nothing.
+     *
+     * @param result Result to report
+     */
+    void finish(final OperationResult result) {
+        final var prev = setDone();
+        if (prev != State.DONE) {
+            setFuture(prev, result);
         }
     }
 
-    synchronized void done() {
-        OperationResult result;
-        switch (this.state) {
+    /**
+     * Mark this request as {@link State#DONE} with a result derived from its current state. If it is already done, this
+     * method does nothing.
+     *
+     * @return Previous state
+     */
+    State cancel() {
+        final var prev = setDone();
+        // FIXME: exhaustive when we have JDK17+
+        switch (prev) {
             case UNSENT:
-                result = OperationResults.UNSENT;
+                setFuture(prev, OperationResults.UNSENT);
                 break;
             case UNACKED:
-                result = OperationResults.NOACK;
+                setFuture(prev, OperationResults.NOACK);
                 break;
             case DONE:
-                return;
+                // No-op
+                break;
             default:
-                return;
+                throw new IllegalStateException("Unhandled state " + prev);
         }
-        done(result);
+        return prev;
     }
 
-    synchronized void sent() {
-        if (this.state == State.UNSENT) {
-            LOG.debug("Request went from {} to {}", this.state, State.UNACKED);
-            this.state = State.UNACKED;
-        }
+    private State setDone() {
+        return (State) STATE.getAndSet(this, State.DONE);
     }
 
-    synchronized long getElapsedMillis() {
-        final long elapsedNanos = this.stopwatch.elapsed().toNanos();
-        final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
-        if (elapsedMillis == 0 && elapsedNanos > 0) {
-            return MINIMUM_ELAPSED_TIME;
-        }
-        return elapsedMillis;
+    private void setFuture(final State prev, final OperationResult result) {
+        LOG.debug("Request went from {} to {}", prev, State.DONE);
+        timer.cancel();
+        future.set(result);
     }
 }
index 99faa97313d8308a01aca67554fbf828a4c15c96..e8dd710c3ef379f1e563818c06fb18d3a279fb01 100644 (file)
@@ -222,7 +222,7 @@ class PCEPTopologySessionListener extends AbstractTopologySessionListener {
                 if (!SRPID_ZERO.equals(id)) {
                     final PCEPRequest req = removeRequest(id);
                     if (req != null) {
-                        req.done(OperationResults.createFailed(errMsg.getErrors()));
+                        req.finish(OperationResults.createFailed(errMsg.getErrors()));
                     } else {
                         LOG.warn("Request ID {} not found in outstanding DB", id);
                     }