Replace per-request j.u.Timer with a Netty Timer
[bgpcep.git] / pcep / topology / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / ServerSessionManager.java
index 05d7b99b9d69564eda33f528f0b60ebbee25f01a..972713afad40b6a8402cda5a3b75af4af7a3e36b 100644 (file)
@@ -15,6 +15,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
@@ -24,12 +26,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.bgpcep.pcep.server.PceServerProvider;
-import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.graph.rev191125.graph.topology.GraphKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.AddLspArgs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.EnsureLspOperationalInput;
@@ -49,19 +50,20 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypesBuilder;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // Non-final for testing
-class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs, TopologySessionStatsRegistry {
+class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs {
     private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class);
     private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5);
 
     private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topology;
     private final @NonNull PCEPTopologyProviderDependencies dependencies;
+    private final @NonNull HashedWheelTimer timer = new HashedWheelTimer();
 
     @VisibleForTesting
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -72,17 +74,19 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
 
     private volatile short rpcTimeout;
 
+    private final GraphKey graphKey;
+
     ServerSessionManager(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
-            final PCEPTopologyProviderDependencies dependencies, final short rpcTimeout) {
+            final PCEPTopologyProviderDependencies dependencies, final short rpcTimeout, final GraphKey graphKey) {
         this.dependencies = requireNonNull(dependencies);
         topology = requireNonNull(instanceIdentifier);
         this.rpcTimeout = rpcTimeout;
+        this.graphKey = requireNonNull(graphKey);
     }
 
     // Initialize the operational view of the topology.
     final ListenableFuture<Boolean> start() {
         LOG.info("Creating PCEP Topology {}", topologyId());
-
         final var tx = dependencies.getDataBroker().newWriteOnlyTransaction();
         tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder()
             .withKey(topology.getKey())
@@ -112,7 +116,7 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
         // Register this new topology to PCE Server
         final PceServerProvider server = dependencies.getPceServerProvider();
         if (server != null) {
-            server.registerPcepTopology(topology);
+            server.registerPcepTopology(topology, graphKey);
         }
         return future;
     }
@@ -122,15 +126,25 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
             LOG.error("Session Manager has already been closed.");
             return CommitInfo.emptyFluentFuture();
         }
+
+        // Clean up sessions
         for (final TopologySessionListener node : nodes.values()) {
             node.close();
         }
         nodes.clear();
+
+        // Clean up remembered metadata
         for (final TopologyNodeState topologyNodeState : state.values()) {
             topologyNodeState.close();
         }
         state.clear();
 
+        // Stop the timer
+        final var cancelledTasks = timer.stop().size();
+        if (cancelledTasks != 0) {
+            LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
+        }
+
         // Un-Register Pcep Topology into PCE Server
         final PceServerProvider server = dependencies.getPceServerProvider();
         if (server != null) {
@@ -154,13 +168,13 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
         return future;
     }
 
-    final synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session,
+    final synchronized void releaseNodeState(final TopologyNodeState nodeState, final InetAddress peerAddress,
             final boolean persistNode) {
         if (isClosed.get()) {
             LOG.error("Session Manager has already been closed.");
             return;
         }
-        final NodeId nodeId = createNodeId(session.getRemoteAddress());
+        final NodeId nodeId = createNodeId(peerAddress);
         nodes.remove(nodeId);
         state.remove(nodeId);
         if (nodeState != null) {
@@ -255,10 +269,14 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
         }
 
         return RpcResultBuilder.<Void>failed()
-            .withError(RpcError.ErrorType.RPC, "Failed to find session " + nodeId)
+            .withError(ErrorType.RPC, "Failed to find session " + nodeId)
             .buildFuture();
     }
 
+    final @NonNull Timer timer() {
+        return timer;
+    }
+
     final short getRpcTimeout() {
         return rpcTimeout;
     }
@@ -273,14 +291,12 @@ class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessio
         }
     }
 
-    @Override
-    public final synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
+    final synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
             final PcepSessionState sessionState) {
         dependencies.getStateRegistry().bind(nodeId, sessionState);
     }
 
-    @Override
-    public final synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
+    final synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
         dependencies.getStateRegistry().unbind(nodeId);
     }