Replace per-request j.u.Timer with a Netty Timer 47/100747/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 22 Apr 2022 17:20:05 +0000 (19:20 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 23 Apr 2022 04:35:35 +0000 (06:35 +0200)
We are currently spawning a full java.util.Timer for every request we are
sending out. That is rather wasteful in terms of resources: each such
timer costs us a thread.

Rather than doing that, use Netty's HashedWheelTimer, which can easily
cope with a large number of timeouts.

JIRA: BGPCEP-1006
Change-Id: I4f3b6bf6a435170c8a054aa0e012418c60b17e96
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit a4cf3c1ad99445d6e9aa7fe1ef5afd4c28a4a0e4)

pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPRequest.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologySessionListener.java

index a206ffed90e15bbd489889e79dd95b2d1e8545e2..70f3f4e3868a1291b584d05ee9a94c0845824de5 100644 (file)
@@ -16,7 +16,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.netty.util.concurrent.FutureListener;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,8 +27,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,6 +96,7 @@ public abstract class AbstractTopologySessionListener implements TopologySession
     @GuardedBy("this")
     SessionStateImpl listenerState;
 
+    // FIXME: clarify lifecycle rules of this map, most notably the interaction of multiple SrpIdNumbers
     @GuardedBy("this")
     private final Map<SrpIdNumber, PCEPRequest> requests = new HashMap<>();
     @GuardedBy("this")
@@ -415,43 +415,51 @@ public abstract class AbstractTopologySessionListener implements TopologySession
             final Metadata metadata) {
         final var sendFuture = session.sendMessage(message);
         listenerState.updateStatefulSentMsg(message);
-        final PCEPRequest req = new PCEPRequest(metadata);
-        requests.put(requestId, req);
+
         final short rpcTimeout = serverSessionManager.getRpcTimeout();
         LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
+
+        final Timeout timeout;
         if (rpcTimeout > 0) {
-            setupTimeoutHandler(requestId, req, rpcTimeout);
+            // Note: the timeout is held back by us holding the 'this' monitor, which timeoutExpired re-acquires
+            timeout = serverSessionManager.timer().newTimeout(ignored -> timeoutExpired(requestId),
+                rpcTimeout, TimeUnit.SECONDS);
+            LOG.trace("Set up response timeout handler for request {}", requestId);
+        } else {
+            timeout = null;
         }
 
-        sendFuture.addListener((FutureListener<Void>) future -> {
-            if (!future.isSuccess()) {
-                synchronized (AbstractTopologySessionListener.this) {
-                    requests.remove(requestId);
-                }
-                req.cancel();
-                LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
-            } else {
-                req.markUnacked();
-                LOG.trace("Request {} sent to peer (object {})", requestId, req);
-            }
-        });
+        final PCEPRequest req = new PCEPRequest(metadata, timeout);
+        requests.put(requestId, req);
 
+        sendFuture.addListener(future -> sendCompleted(future, requestId, req));
         return req.getFuture();
     }
 
-    private void setupTimeoutHandler(final SrpIdNumber requestId, final PCEPRequest req, final short timeout) {
-        final Timer timer = req.getTimer();
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                synchronized (AbstractTopologySessionListener.this) {
-                    requests.remove(requestId);
-                }
-                req.cancel();
-                LOG.info("Request {} timed-out waiting for response", requestId);
+    private void sendCompleted(final Future<?> future, final SrpIdNumber requestId, final PCEPRequest req) {
+        if (!future.isSuccess()) {
+            // FIXME: use concurrent operations and re-validate request vs. id
+            synchronized (AbstractTopologySessionListener.this) {
+                requests.remove(requestId);
             }
-        }, TimeUnit.SECONDS.toMillis(timeout));
-        LOG.trace("Set up response timeout handler for request {}", requestId);
+            req.cancel();
+            LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
+        } else {
+            req.markUnacked();
+            LOG.trace("Request {} sent to peer (object {})", requestId, req);
+        }
+    }
+
+    private void timeoutExpired(final SrpIdNumber requestId) {
+        final PCEPRequest req;
+        synchronized (this) {
+            req = requests.remove(requestId);
+        }
+
+        if (req != null) {
+            LOG.info("Request {} timed-out waiting for response", requestId);
+            req.cancel();
+        }
     }
 
     /**
index 4e117736cd209e8036f29b7873a9d218d7521a29..6d85edfa3236d231d8996e873c12d8c0b39183d1 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.Timeout;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
-import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.OperationResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.lsp.metadata.Metadata;
@@ -59,10 +59,11 @@ final class PCEPRequest {
     private volatile State state = State.UNSENT;
 
     // Guarded by state going to State.DONE
-    private final Timer timer = new Timer();
+    private Timeout timeout;
 
-    PCEPRequest(final Metadata metadata) {
+    PCEPRequest(final Metadata metadata, final Timeout timeout) {
         this.metadata = metadata;
+        this.timeout = timeout;
     }
 
     protected ListenableFuture<OperationResult> getFuture() {
@@ -73,10 +74,6 @@ final class PCEPRequest {
         return metadata;
     }
 
-    Timer getTimer() {
-        return timer;
-    }
-
     long getElapsedMillis() {
         final long elapsedNanos = stopwatch.elapsed().toNanos();
         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedNanos);
@@ -139,7 +136,10 @@ final class PCEPRequest {
 
     private void setFuture(final State prev, final OperationResult result) {
         LOG.debug("Request went from {} to {}", prev, State.DONE);
-        timer.cancel();
+        if (timeout != null) {
+            timeout.cancel();
+            timeout = null;
+        }
         future.set(result);
     }
 }
index 88c769376c50cc1d0db546699d5eef829e0c38ba..ffa7ab98df1eae105f057935778f22c79cf15fe1 100644 (file)
@@ -15,6 +15,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
@@ -60,6 +62,7 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
 
     private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topology;
     private final @NonNull PCEPTopologyProviderDependencies dependencies;
+    private final @NonNull HashedWheelTimer timer = new HashedWheelTimer();
 
     @VisibleForTesting
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -115,15 +118,25 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
             LOG.error("Session Manager has already been closed.");
             return CommitInfo.emptyFluentFuture();
         }
+
+        // Clean up sessions
         for (final TopologySessionListener node : nodes.values()) {
             node.close();
         }
         nodes.clear();
+
+        // Clean up remembered metadata
         for (final TopologyNodeState topologyNodeState : state.values()) {
             topologyNodeState.close();
         }
         state.clear();
 
+        // Stop the timer
+        final var cancelledTasks = timer.stop().size();
+        if (cancelledTasks != 0) {
+            LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
+        }
+
         final WriteTransaction t = dependencies.getDataBroker().newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, topology);
         final FluentFuture<? extends CommitInfo> future = t.commit();
@@ -246,6 +259,10 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
             .buildFuture();
     }
 
+    final @NonNull Timer timer() {
+        return timer;
+    }
+
     final short getRpcTimeout() {
         return rpcTimeout;
     }
index c7d0272a8511008ede0c4bf0248ab41b05a0e4c1..a1afe4475c8ded6f4e9b2112b99cada940caf11d 100644 (file)
@@ -10,6 +10,5 @@ package org.opendaylight.bgpcep.pcep.topology.provider;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 
 interface TopologySessionListener extends PCEPSessionListener, TopologySessionRPCs {
-    // FIXME: this needs to provide a future which completes when everything is cleaned up
     void close();
 }
\ No newline at end of file