// Clear all requests we know about
for (final Entry<SrpIdNumber, PCEPRequest> e : requests.entrySet()) {
- final PCEPRequest r = e.getValue();
- switch (r.getState()) {
+ // FIXME: exhaustive when we have JDK17+
+ switch (e.getValue().cancel()) {
case DONE:
// Done is done, nothing to do
LOG.trace("Request {} was done when session went down.", e.getKey());
// Peer has not acked: results in failure
LOG.info("Request {} was incomplete when session went down, failing the instruction",
e.getKey());
- r.done(OperationResults.NOACK);
break;
case UNSENT:
// Peer has not been sent to the peer: results in cancellation
LOG.debug("Request {} was not sent when session went down, cancelling the instruction",
e.getKey());
- r.done(OperationResults.UNSENT);
break;
default:
break;
synchronized (AbstractTopologySessionListener.this) {
requests.remove(requestId);
}
- req.done(OperationResults.UNSENT);
+ req.cancel();
LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
} else {
- req.sent();
+ req.markUnacked();
LOG.trace("Request {} sent to peer (object {})", requestId, req);
}
});
synchronized (AbstractTopologySessionListener.this) {
requests.remove(requestId);
}
- req.done();
+ req.cancel();
LOG.info("Request {} timed-out waiting for response", requestId);
}
}, TimeUnit.SECONDS.toMillis(timeout));
private void notifyRequests() {
for (final PCEPRequest r : requests) {
- r.done(OperationResults.SUCCESS);
+ r.finish(OperationResults.SUCCESS);
}
}
}
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
-import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class PCEPRequest {
-
- private static final Logger LOG = LoggerFactory.getLogger(PCEPRequest.class);
-
- private static final long MINIMUM_ELAPSED_TIME = 1;
-
+ /**
+ * Logical state of a {@link PCEPRequest}.
+ */
enum State {
+ /**
+ * The request has not been written out to the session.
+ */
UNSENT,
+ /**
+ * The request has been sent to to the sesssion, but has not been acknowledged by the peer.
+ */
UNACKED,
+ /**
+ * The request has been completed.
+ */
DONE,
}
- private final SettableFuture<OperationResult> future;
+ private static final Logger LOG = LoggerFactory.getLogger(PCEPRequest.class);
+ private static final long MINIMUM_ELAPSED_TIME = 1;
+ private static final VarHandle STATE;
+
+ static {
+ try {
+ STATE = MethodHandles.lookup().findVarHandle(PCEPRequest.class, "state", State.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ private final SettableFuture<OperationResult> future = SettableFuture.create();
+ private final Stopwatch stopwatch = Stopwatch.createStarted();
private final Metadata metadata;
- private volatile State state;
- @GuardedBy("this")
- private final Stopwatch stopwatch;
- private final Timer timer;
+
+ // Manipulated via STATE
+ @SuppressWarnings("unused")
+ private volatile State state = State.UNSENT;
+
+ // Guarded by state going to State.DONE
+ private final Timer timer = new Timer();
PCEPRequest(final Metadata metadata) {
- this.future = SettableFuture.create();
this.metadata = metadata;
- this.state = State.UNSENT;
- this.stopwatch = Stopwatch.createStarted();
- this.timer = new Timer();
}
protected ListenableFuture<OperationResult> getFuture() {
- return this.future;
+ return future;
}
public Metadata getMetadata() {
- return this.metadata;
+ return metadata;
}
- public State getState() {
- return this.state;
+ Timer getTimer() {
+ return timer;
}
- Timer getTimer() {
- return this.timer;
+ long getElapsedMillis() {
+ final long elapsedNanos = stopwatch.elapsed().toNanos();
+ final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
+ if (elapsedMillis == 0 && elapsedNanos > 0) {
+ return MINIMUM_ELAPSED_TIME;
+ }
+ return elapsedMillis;
+ }
+
+ /**
+ * Mark this request as {@link State#UNACKED} if it currently is {@link State#UNSENT}.
+ */
+ void markUnacked() {
+ if (STATE.compareAndSet(this, State.UNSENT, State.UNACKED)) {
+ LOG.debug("Request went from {} to {}", State.UNSENT, State.UNACKED);
+ }
}
- synchronized void done(final OperationResult result) {
- if (this.state != State.DONE) {
- LOG.debug("Request went from {} to {}", this.state, State.DONE);
- this.state = State.DONE;
- this.timer.cancel();
- this.future.set(result);
+ /**
+ * Mark this request as {@link State#DONE} with specified {@link OperationResult}. If it is already done, this
+ * method does nothing.
+ *
+ * @param result Result to report
+ */
+ void finish(final OperationResult result) {
+ final var prev = setDone();
+ if (prev != State.DONE) {
+ setFuture(prev, result);
}
}
- synchronized void done() {
- OperationResult result;
- switch (this.state) {
+ /**
+ * Mark this request as {@link State#DONE} with a result derived from its current state. If it is already done, this
+ * method does nothing.
+ *
+ * @return Previous state
+ */
+ State cancel() {
+ final var prev = setDone();
+ // FIXME: exhaustive when we have JDK17+
+ switch (prev) {
case UNSENT:
- result = OperationResults.UNSENT;
+ setFuture(prev, OperationResults.UNSENT);
break;
case UNACKED:
- result = OperationResults.NOACK;
+ setFuture(prev, OperationResults.NOACK);
break;
case DONE:
- return;
+ // No-op
+ break;
default:
- return;
+ throw new IllegalStateException("Unhandled state " + prev);
}
- done(result);
+ return prev;
}
- synchronized void sent() {
- if (this.state == State.UNSENT) {
- LOG.debug("Request went from {} to {}", this.state, State.UNACKED);
- this.state = State.UNACKED;
- }
+ private State setDone() {
+ return (State) STATE.getAndSet(this, State.DONE);
}
- synchronized long getElapsedMillis() {
- final long elapsedNanos = this.stopwatch.elapsed().toNanos();
- final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
- if (elapsedMillis == 0 && elapsedNanos > 0) {
- return MINIMUM_ELAPSED_TIME;
- }
- return elapsedMillis;
+ private void setFuture(final State prev, final OperationResult result) {
+ LOG.debug("Request went from {} to {}", prev, State.DONE);
+ timer.cancel();
+ future.set(result);
}
}