import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.FutureListener;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@GuardedBy("this")
SessionStateImpl listenerState;
+ // FIXME: clarify lifecycle rules of this map, most notably the interaction of multiple SrpIdNumbers
@GuardedBy("this")
private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
@GuardedBy("this")
final Metadata metadata) {
final var sendFuture = session.sendMessage(message);
listenerState.updateStatefulSentMsg(message);
- final PCEPRequest req = new PCEPRequest(metadata);
- requests.put(requestId, req);
+
final short rpcTimeout = serverSessionManager.getRpcTimeout();
LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+
+ final Timeout timeout;
if (rpcTimeout > 0) {
- setupTimeoutHandler(requestId, req, rpcTimeout);
+ // Note: the timeout is held back by us holding the 'this' monitor, which timeoutExpired re-acquires
+ timeout = serverSessionManager.timer().newTimeout(ignored -> timeoutExpired(requestId),
+ rpcTimeout, TimeUnit.SECONDS);
+ LOG.trace("Set up response timeout handler for request {}", requestId);
+ } else {
+ timeout = null;
}
- sendFuture.addListener((FutureListener<Void>) future -> {
- if (!future.isSuccess()) {
- synchronized (AbstractTopologySessionListener.this) {
- requests.remove(requestId);
- }
- req.cancel();
- LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
- } else {
- req.markUnacked();
- LOG.trace("Request {} sent to peer (object {})", requestId, req);
- }
- });
+ final PCEPRequest req = new PCEPRequest(metadata, timeout);
+ requests.put(requestId, req);
+ sendFuture.addListener(future -> sendCompleted(future, requestId, req));
return req.getFuture();
}
- private void setupTimeoutHandler(final SrpIdNumber requestId, final PCEPRequest req, final short timeout) {
- final Timer timer = req.getTimer();
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized (AbstractTopologySessionListener.this) {
- requests.remove(requestId);
- }
- req.cancel();
- LOG.info("Request {} timed-out waiting for response", requestId);
+ private void sendCompleted(final Future<?> future, final SrpIdNumber requestId, final PCEPRequest req) {
+ if (!future.isSuccess()) {
+ // FIXME: use concurrent operations and re-validate request vs. id
+ synchronized (AbstractTopologySessionListener.this) {
+ requests.remove(requestId);
}
- }, TimeUnit.SECONDS.toMillis(timeout));
- LOG.trace("Set up response timeout handler for request {}", requestId);
+ req.cancel();
+ LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
+ } else {
+ req.markUnacked();
+ LOG.trace("Request {} sent to peer (object {})", requestId, req);
+ }
+ }
+
+ private void timeoutExpired(final SrpIdNumber requestId) {
+ final PCEPRequest req;
+ synchronized (this) {
+ req = requests.remove(requestId);
+ }
+
+ if (req != null) {
+ LOG.info("Request {} timed-out waiting for response", requestId);
+ req.cancel();
+ }
}
/**
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.Timeout;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
-import java.util.Timer;
import java.util.concurrent.TimeUnit;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
private volatile State state = State.UNSENT;
// Guarded by state going to State.DONE
- private final Timer timer = new Timer();
+ private Timeout timeout;
- PCEPRequest(final Metadata metadata) {
+ PCEPRequest(final Metadata metadata, final Timeout timeout) {
this.metadata = metadata;
+ this.timeout = timeout;
}
protected ListenableFuture<OperationResult> getFuture() {
return metadata;
}
- Timer getTimer() {
- return timer;
- }
-
long getElapsedMillis() {
final long elapsedNanos = System.nanoTime() - startNanos;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
private void setFuture(final State prev, final OperationResult result) {
LOG.debug("Request went from {} to {}", prev, State.DONE);
- timer.cancel();
+ if (timeout != null) {
+ timeout.cancel();
+ timeout = null;
+ }
future.set(result);
}
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topology;
private final @NonNull PCEPTopologyProviderDependencies dependencies;
+ private final @NonNull HashedWheelTimer timer = new HashedWheelTimer();
@VisibleForTesting
final AtomicBoolean isClosed = new AtomicBoolean(false);
LOG.error("Session Manager has already been closed.");
return CommitInfo.emptyFluentFuture();
}
+
+ // Clean up sessions
for (final TopologySessionListener node : nodes.values()) {
node.close();
}
nodes.clear();
+
+ // Clean up remembered metadata
for (final TopologyNodeState topologyNodeState : state.values()) {
topologyNodeState.close();
}
state.clear();
+ // Stop the timer
+ final var cancelledTasks = timer.stop().size();
+ if (cancelledTasks != 0) {
+ LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
+ }
+
// Un-Register Pcep Topology into PCE Server
final PceServerProvider server = dependencies.getPceServerProvider();
if (server != null) {
.buildFuture();
}
+ final @NonNull Timer timer() {
+ return timer;
+ }
+
final short getRpcTimeout() {
return rpcTimeout;
}