Replace per-request j.u.Timer with a Netty Timer 05/100705/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 17:20:05 +0000 (19:20 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 18:49:23 +0000 (20:49 +0200)
We are currently spawning a full java.util.Timer for every request we are
sending out. That is rather wasteful in terms of resources: each such
timer costs us a thread.

Rather than doing that, use Netty's HashedWheelTimer, which can easily
cope with a large number of timeouts.

JIRA: BGPCEP-1006
Change-Id: I4f3b6bf6a435170c8a054aa0e012418c60b17e96
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPRequest.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologySessionListener.java

index 165781918c387151db7166344177dde5bdc7cab9..207ec58b643fd96eb814b2770bf5978b2c80b269 100644 (file)
@@ -15,7 +15,8 @@ import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.FutureListener;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -25,8 +26,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -96,6 +95,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
     @GuardedBy("this")
     SessionStateImpl listenerState;
 
+    // FIXME: clarify lifecycle rules of this map, most notably the interaction of multiple SrpIdNumbers
     @GuardedBy("this")
     private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
     @GuardedBy("this")
@@ -414,43 +414,51 @@ public abstract class AbstractTopologySessionListener implements TopologySession
             final Metadata metadata) {
         final var sendFuture = session.sendMessage(message);
         listenerState.updateStatefulSentMsg(message);
-        final PCEPRequest req = new PCEPRequest(metadata);
-        requests.put(requestId, req);
+
         final short rpcTimeout = serverSessionManager.getRpcTimeout();
         LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+
+        final Timeout timeout;
         if (rpcTimeout > 0) {
-            setupTimeoutHandler(requestId, req, rpcTimeout);
+            // Note: the timeout is held back by us holding the 'this' monitor, which timeoutExpired re-acquires
+            timeout = serverSessionManager.timer().newTimeout(ignored -> timeoutExpired(requestId),
+                rpcTimeout, TimeUnit.SECONDS);
+            LOG.trace("Set up response timeout handler for request {}", requestId);
+        } else {
+            timeout = null;
         }
 
-        sendFuture.addListener((FutureListener<Void>) future -> {
-            if (!future.isSuccess()) {
-                synchronized (AbstractTopologySessionListener.this) {
-                    requests.remove(requestId);
-                }
-                req.cancel();
-                LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
-            } else {
-                req.markUnacked();
-                LOG.trace("Request {} sent to peer (object {})", requestId, req);
-            }
-        });
+        final PCEPRequest req = new PCEPRequest(metadata, timeout);
+        requests.put(requestId, req);
 
+        sendFuture.addListener(future -> sendCompleted(future, requestId, req));
         return req.getFuture();
     }
 
-    private void setupTimeoutHandler(final SrpIdNumber requestId, final PCEPRequest req, final short timeout) {
-        final Timer timer = req.getTimer();
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                synchronized (AbstractTopologySessionListener.this) {
-                    requests.remove(requestId);
-                }
-                req.cancel();
-                LOG.info("Request {} timed-out waiting for response", requestId);
+    private void sendCompleted(final Future<?> future, final SrpIdNumber requestId, final PCEPRequest req) {
+        if (!future.isSuccess()) {
+            // FIXME: use concurrent operations and re-validate request vs. id
+            synchronized (AbstractTopologySessionListener.this) {
+                requests.remove(requestId);
             }
-        }, TimeUnit.SECONDS.toMillis(timeout));
-        LOG.trace("Set up response timeout handler for request {}", requestId);
+            req.cancel();
+            LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
+        } else {
+            req.markUnacked();
+            LOG.trace("Request {} sent to peer (object {})", requestId, req);
+        }
+    }
+
+    private void timeoutExpired(final SrpIdNumber requestId) {
+        final PCEPRequest req;
+        synchronized (this) {
+            req = requests.remove(requestId);
+        }
+
+        if (req != null) {
+            LOG.info("Request {} timed-out waiting for response", requestId);
+            req.cancel();
+        }
     }
 
     /**
index c2c3128a0d36994bcf8f1fbd0936b0a9c1c3a494..99a40efcd8b97f6b5124e4fc16f3011c7afc4fd1 100644 (file)
@@ -9,9 +9,9 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.Timeout;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
-import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
@@ -57,10 +57,11 @@ final class PCEPRequest {
     private volatile State state = State.UNSENT;
 
     // Guarded by state going to State.DONE
-    private final Timer timer = new Timer();
+    private Timeout timeout;
 
-    PCEPRequest(final Metadata metadata) {
+    PCEPRequest(final Metadata metadata, final Timeout timeout) {
         this.metadata = metadata;
+        this.timeout = timeout;
     }
 
     protected ListenableFuture<OperationResult> getFuture() {
@@ -71,10 +72,6 @@ final class PCEPRequest {
         return metadata;
     }
 
-    Timer getTimer() {
-        return timer;
-    }
-
     long getElapsedMillis() {
         final long elapsedNanos = System.nanoTime() - startNanos;
         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
@@ -136,7 +133,10 @@ final class PCEPRequest {
 
     private void setFuture(final State prev, final OperationResult result) {
         LOG.debug("Request went from {} to {}", prev, State.DONE);
-        timer.cancel();
+        if (timeout != null) {
+            timeout.cancel();
+            timeout = null;
+        }
         future.set(result);
     }
 }
index 20440f3a78e94a20502528b97f299b68980d94ec..972713afad40b6a8402cda5a3b75af4af7a3e36b 100644 (file)
@@ -15,6 +15,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
@@ -61,6 +63,7 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
 
     private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topology;
     private final @NonNull PCEPTopologyProviderDependencies dependencies;
+    private final @NonNull HashedWheelTimer timer = new HashedWheelTimer();
 
     @VisibleForTesting
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -123,15 +126,25 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
             LOG.error("Session Manager has already been closed.");
             return CommitInfo.emptyFluentFuture();
         }
+
+        // Clean up sessions
         for (final TopologySessionListener node : nodes.values()) {
             node.close();
         }
         nodes.clear();
+
+        // Clean up remembered metadata
         for (final TopologyNodeState topologyNodeState : state.values()) {
             topologyNodeState.close();
         }
         state.clear();
 
+        // Stop the timer
+        final var cancelledTasks = timer.stop().size();
+        if (cancelledTasks != 0) {
+            LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
+        }
+
         // Un-Register Pcep Topology into PCE Server
         final PceServerProvider server = dependencies.getPceServerProvider();
         if (server != null) {
@@ -260,6 +273,10 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
             .buildFuture();
     }
 
+    final @NonNull Timer timer() {
+        return timer;
+    }
+
     final short getRpcTimeout() {
         return rpcTimeout;
     }
index c7d0272a8511008ede0c4bf0248ab41b05a0e4c1..a1afe4475c8ded6f4e9b2112b99cada940caf11d 100644 (file)
@@ -10,6 +10,5 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 
 interface TopologySessionListener extends PCEPSessionListener, TopologySessionRPCs {
-    // FIXME: this needs to provide a future which completes when everything is cleaned up
     void close();
 }
\ No newline at end of file