import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.bgpcep.pcep.server.PceServerProvider;
-import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.graph.rev191125.graph.topology.GraphKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.AddLspArgs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev200120.EnsureLspOperationalInput;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypesBuilder;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Non-final for testing
-class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs, TopologySessionStatsRegistry {
+class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs {
private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class);
private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5);
private final @NonNull KeyedInstanceIdentifier<Topology, TopologyKey> topology;
private final @NonNull PCEPTopologyProviderDependencies dependencies;
+ private final @NonNull HashedWheelTimer timer = new HashedWheelTimer();
@VisibleForTesting
final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile short rpcTimeout;
+ private final GraphKey graphKey;
+
ServerSessionManager(final KeyedInstanceIdentifier<Topology, TopologyKey> instanceIdentifier,
- final PCEPTopologyProviderDependencies dependencies, final short rpcTimeout) {
+ final PCEPTopologyProviderDependencies dependencies, final short rpcTimeout, final GraphKey graphKey) {
this.dependencies = requireNonNull(dependencies);
topology = requireNonNull(instanceIdentifier);
this.rpcTimeout = rpcTimeout;
+ this.graphKey = requireNonNull(graphKey);
}
// Initialize the operational view of the topology.
final ListenableFuture<Boolean> start() {
LOG.info("Creating PCEP Topology {}", topologyId());
-
final var tx = dependencies.getDataBroker().newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder()
.withKey(topology.getKey())
// Register this new topology to PCE Server
final PceServerProvider server = dependencies.getPceServerProvider();
if (server != null) {
- server.registerPcepTopology(topology);
+ server.registerPcepTopology(topology, graphKey);
}
return future;
}
LOG.error("Session Manager has already been closed.");
return CommitInfo.emptyFluentFuture();
}
+
+ // Clean up sessions
for (final TopologySessionListener node : nodes.values()) {
node.close();
}
nodes.clear();
+
+ // Clean up remembered metadata
for (final TopologyNodeState topologyNodeState : state.values()) {
topologyNodeState.close();
}
state.clear();
+ // Stop the timer
+ final var cancelledTasks = timer.stop().size();
+ if (cancelledTasks != 0) {
+ LOG.warn("Stopped timer with {} remaining tasks", cancelledTasks);
+ }
+
// Un-Register Pcep Topology into PCE Server
final PceServerProvider server = dependencies.getPceServerProvider();
if (server != null) {
return future;
}
- final synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session,
+ final synchronized void releaseNodeState(final TopologyNodeState nodeState, final InetAddress peerAddress,
final boolean persistNode) {
if (isClosed.get()) {
LOG.error("Session Manager has already been closed.");
return;
}
- final NodeId nodeId = createNodeId(session.getRemoteAddress());
+ final NodeId nodeId = createNodeId(peerAddress);
nodes.remove(nodeId);
state.remove(nodeId);
if (nodeState != null) {
}
return RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.RPC, "Failed to find session " + nodeId)
+ .withError(ErrorType.RPC, "Failed to find session " + nodeId)
.buildFuture();
}
+ final @NonNull Timer timer() {
+ return timer;
+ }
+
final short getRpcTimeout() {
return rpcTimeout;
}
}
}
- @Override
- public final synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
+ final synchronized void bind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId,
final PcepSessionState sessionState) {
dependencies.getStateRegistry().bind(nodeId, sessionState);
}
- @Override
- public final synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
+ final synchronized void unbind(final KeyedInstanceIdentifier<Node, NodeKey> nodeId) {
dependencies.getStateRegistry().unbind(nodeId);
}