X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=pcep%2Ftopology%2Ftopology-provider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fbgpcep%2Fpcep%2Ftopology%2Fprovider%2FServerSessionManager.java;h=46f58f183dbd8ab2da76430d3ac607bac3fa20ed;hb=c172fd0e6182f9125e8061af80444e8026ec84e4;hp=5efdfca25264e7082bd7caa39d9c334e67997394;hpb=067c3fbf1a5e3f622c577d1cd32cde8a1db54594;p=bgpcep.git diff --git a/pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java b/pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java old mode 100755 new mode 100644 index 5efdfca252..46f58f183d --- a/pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java +++ b/pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java @@ -9,166 +9,230 @@ package org.opendaylight.bgpcep.pcep.topology.provider; import static java.util.Objects.requireNonNull; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.util.Timeout; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.bgpcep.pcep.topology.spi.stats.TopologySessionStatsRegistry; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.protocol.pcep.PCEPPeerProposal; -import org.opendaylight.protocol.pcep.PCEPSession; -import org.opendaylight.protocol.pcep.PCEPSessionListener; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.bgpcep.pcep.server.PceServerProvider; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.PcepSessionState; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.TlvsBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.AddLspArgs; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.EnsureLspOperationalInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.OperationResult; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.RemoveLspArgs; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.TearDownSessionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.TopologyTypes1; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.TopologyTypes1Builder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.TriggerSyncArgs; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.UpdateLspArgs; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.topology.pcep.type.TopologyPcepBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.graph.rev220720.graph.topology.GraphKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev200720.SrpIdNumber; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.AddLspArgs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.EnsureLspOperationalInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.OperationResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.RemoveLspArgs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TearDownSessionInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TearDownSessionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TopologyTypes1Builder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.TriggerSyncArgs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.UpdateLspArgs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev220730.network.topology.topology.topology.types.TopologyPcepBuilder; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypesBuilder; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs, PCEPPeerProposal, - TopologySessionStatsRegistry { +// Non-final for testing +class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs { + @FunctionalInterface + interface RpcTimeout { + void run(SrpIdNumber requestId); + } + private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class); private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5); - private static final String FAILURE_MSG = "Failed to find session"; - @VisibleForTesting - final AtomicBoolean isClosed = new AtomicBoolean(false); + private static final VarHandle CLOSED; + + static { + try { + CLOSED = MethodHandles.lookup().findVarHandle(ServerSessionManager.class, "closed", boolean.class); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final @NonNull KeyedInstanceIdentifier topology; + private final @NonNull PCEPTopologyProviderDependencies dependencies; + private final @NonNull GraphKey graphKey; + @GuardedBy("this") private final Map nodes = new HashMap<>(); @GuardedBy("this") private final Map state = new HashMap<>(); - private final TopologySessionListenerFactory listenerFactory; - private final InstanceIdentifier topology; - private final DataBroker broker; - private final PCEPStatefulPeerProposal peerProposal; - private final short rpcTimeout; - private final TopologySessionStatsRegistry statsRegistry; - - public ServerSessionManager(final DataBroker broker, - final InstanceIdentifier topology, - final TopologySessionListenerFactory listenerFactory, - final TopologySessionStatsRegistry statsRegistry, - final short rpcTimeout) { - this.broker = requireNonNull(broker); + + private volatile long updateInterval; + private volatile short rpcTimeout; + private volatile boolean closed; + + ServerSessionManager(final KeyedInstanceIdentifier topology, + final PCEPTopologyProviderDependencies dependencies, final GraphKey graphKey, + final short rpcTimeout, final long updateInterval) { + this.dependencies = requireNonNull(dependencies); this.topology = requireNonNull(topology); - this.statsRegistry = requireNonNull(statsRegistry); - this.listenerFactory = requireNonNull(listenerFactory); - this.peerProposal = PCEPStatefulPeerProposal.createStatefulPeerProposal(this.broker, this.topology); + this.graphKey = requireNonNull(graphKey); this.rpcTimeout = rpcTimeout; + this.updateInterval = updateInterval; } - private static NodeId createNodeId(final InetAddress addr) { - return new NodeId("pcc://" + addr.getHostAddress()); + // Initialize the operational view of the topology. + final ListenableFuture start() { + LOG.info("Creating PCEP Topology {}", topologyId()); + final var tx = dependencies.getDataBroker().newWriteOnlyTransaction(); + tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder() + .withKey(topology.getKey()) + .setTopologyTypes(new TopologyTypesBuilder() + .addAugmentation(new TopologyTypes1Builder().setTopologyPcep(new TopologyPcepBuilder().build()).build()) + .build()) + .build()); + + final var future = SettableFuture.create(); + final var txFuture = tx.commit(); + txFuture.addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.info("PCEP Topology {} created successfully.", topologyId()); + closed = false; + future.set(Boolean.TRUE); + } + + @Override + public void onFailure(final Throwable failure) { + LOG.error("Failed to create PCEP Topology {}.", topologyId(), failure); + closed = true; + future.set(Boolean.FALSE); + } + }, MoreExecutors.directExecutor()); + + // Register this new topology to PCE Server + final PceServerProvider server = dependencies.getPceServerProvider(); + if (server != null) { + server.registerPcepTopology(topology, graphKey); + } + return future; } - /** - * Create Base Topology - */ - synchronized ListenableFuture instantiateServiceInstance() { - final TopologyKey key = InstanceIdentifier.keyOf(this.topology); - final TopologyId topologyId = key.getTopologyId(); - final WriteTransaction tx = this.broker.newWriteOnlyTransaction(); - tx.put(LogicalDatastoreType.OPERATIONAL, this.topology, new TopologyBuilder().setKey(key) - .setTopologyId(topologyId).setTopologyTypes(new TopologyTypesBuilder() - .addAugmentation(TopologyTypes1.class, new TopologyTypes1Builder().setTopologyPcep( - new TopologyPcepBuilder().build()).build()).build()) - .setNode(new ArrayList<>()).build(), true); - final ListenableFuture future = tx.submit(); - Futures.addCallback(future, new FutureCallback() { + final boolean isClosed() { + return closed; + } + + final synchronized FluentFuture stop() { + if (!CLOSED.compareAndSet(this, false, true)) { + LOG.error("Session Manager has already been closed."); + return CommitInfo.emptyFluentFuture(); + } + + // Clean up sessions + for (final TopologySessionListener node : nodes.values()) { + node.close(); + } + nodes.clear(); + + // Clean up remembered metadata + for (final TopologyNodeState topologyNodeState : state.values()) { + topologyNodeState.close(); + } + state.clear(); + + // Un-Register Pcep Topology into PCE Server + final PceServerProvider server = dependencies.getPceServerProvider(); + if (server != null) { + server.unRegisterPcepTopology(topology); + } + + final WriteTransaction t = dependencies.getDataBroker().newWriteOnlyTransaction(); + t.delete(LogicalDatastoreType.OPERATIONAL, topology); + final FluentFuture future = t.commit(); + future.addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { - LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue()); - ServerSessionManager.this.isClosed.set(false); + public void onSuccess(final CommitInfo result) { + LOG.debug("Topology {} removed", topology); } @Override - public void onFailure(final Throwable t) { - LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), t); - ServerSessionManager.this.isClosed.set(true); + public void onFailure(final Throwable throwable) { + LOG.warn("Failed to remove Topology {}", topology, throwable); } }, MoreExecutors.directExecutor()); return future; } - synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session, final boolean persistNode) { - if (this.isClosed.get()) { + final synchronized void releaseNodeState(final TopologyNodeState nodeState, final InetAddress peerAddress, + final boolean persistNode) { + if (isClosed()) { LOG.error("Session Manager has already been closed."); return; } - this.nodes.remove(createNodeId(session.getRemoteAddress())); + final NodeId nodeId = createNodeId(peerAddress); + nodes.remove(nodeId); + state.remove(nodeId); if (nodeState != null) { LOG.debug("Node {} unbound", nodeState.getNodeId()); nodeState.released(persistNode); } } - synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener, final boolean retrieveNode) { + final synchronized TopologyNodeState takeNodeState(final InetAddress address, + final TopologySessionListener sessionListener, final boolean retrieveNode) { final NodeId id = createNodeId(address); - if (this.isClosed.get()) { - LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, sessionListener); + if (isClosed()) { + LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, + sessionListener); return null; } LOG.debug("Node {} requested by listener {}", id, sessionListener); - TopologyNodeState ret = this.state.get(id); + TopologyNodeState ret = state.get(id); if (ret == null) { - ret = new TopologyNodeState(this.broker, this.topology, id, DEFAULT_HOLD_STATE_NANOS); + ret = new TopologyNodeState(dependencies.getDataBroker(), topology, id, DEFAULT_HOLD_STATE_NANOS); LOG.debug("Created topology node {} for id {} at {}", ret, id, ret.getNodeId()); - this.state.put(id, ret); + state.put(id, ret); } // if another listener requests the same session, close it - final TopologySessionListener existingSessionListener = this.nodes.get(id); + final TopologySessionListener existingSessionListener = nodes.get(id); if (existingSessionListener != null && !sessionListener.equals(existingSessionListener)) { - LOG.error("New session listener {} is in conflict with existing session listener {} on node {}, closing the existing one.", existingSessionListener, sessionListener, id); + LOG.error("New session listener {} is in conflict with existing session listener {} on node {}," + + " closing the existing one.", existingSessionListener, sessionListener, id); existingSessionListener.close(); } ret.taken(retrieveNode); - this.nodes.put(id, sessionListener); + nodes.put(id, sessionListener); LOG.debug("Node {} bound to listener {}", id, sessionListener); return ret; } + // Non-final for testing @Override - public PCEPSessionListener getSessionListener() { - return this.listenerFactory.createTopologySessionListener(this); + public PCEPTopologySessionListener getSessionListener() { + return new PCEPTopologySessionListener(dependencies.getStateRegistry(), this, + dependencies.getPceServerProvider()); } - protected final synchronized TopologySessionListener checkSessionPresence(final NodeId nodeId) { + private synchronized TopologySessionListener checkSessionPresence(final NodeId nodeId) { // Get the listener corresponding to the node - final TopologySessionListener l = this.nodes.get(nodeId); + final TopologySessionListener l = nodes.get(nodeId); if (l == null) { LOG.debug("Session for node {} not found", nodeId); return null; @@ -177,95 +241,82 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology } @Override - public synchronized ListenableFuture addLsp(final AddLspArgs input) { + public final synchronized ListenableFuture addLsp(final AddLspArgs input) { final TopologySessionListener l = checkSessionPresence(input.getNode()); return l != null ? l.addLsp(input) : OperationResults.UNSENT.future(); } @Override - public synchronized ListenableFuture removeLsp(final RemoveLspArgs input) { + public final synchronized ListenableFuture removeLsp(final RemoveLspArgs input) { final TopologySessionListener l = checkSessionPresence(input.getNode()); return l != null ? l.removeLsp(input) : OperationResults.UNSENT.future(); } @Override - public synchronized ListenableFuture updateLsp(final UpdateLspArgs input) { + public final synchronized ListenableFuture updateLsp(final UpdateLspArgs input) { final TopologySessionListener l = checkSessionPresence(input.getNode()); return l != null ? l.updateLsp(input) : OperationResults.UNSENT.future(); } @Override - public synchronized ListenableFuture ensureLspOperational(final EnsureLspOperationalInput input) { + public final synchronized ListenableFuture ensureLspOperational( + final EnsureLspOperationalInput input) { final TopologySessionListener l = checkSessionPresence(input.getNode()); return l != null ? l.ensureLspOperational(input) : OperationResults.UNSENT.future(); } @Override - public synchronized ListenableFuture triggerSync(final TriggerSyncArgs input) { + public final synchronized ListenableFuture triggerSync(final TriggerSyncArgs input) { final TopologySessionListener l = checkSessionPresence(input.getNode()); return l != null ? l.triggerSync(input) : OperationResults.UNSENT.future(); } @Override - public ListenableFuture> tearDownSession(final TearDownSessionInput input) { - final TopologySessionListener l = checkSessionPresence(input.getNode()); - if (l == null) { - return RpcResultBuilder.failed().withError(RpcError.ErrorType.RPC, - FAILURE_MSG).buildFuture(); + public final ListenableFuture> tearDownSession(final TearDownSessionInput input) { + final NodeId nodeId = input.getNode(); + final TopologySessionListener listener = checkSessionPresence(nodeId); + if (listener != null) { + return listener.tearDownSession(input); } - return l.tearDownSession(input); + return RpcResultBuilder.failed() + .withError(ErrorType.RPC, "Failed to find session " + nodeId) + .buildFuture(); } - synchronized ListenableFuture closeServiceInstance() { - if (this.isClosed.getAndSet(true)) { - LOG.error("Session Manager has already been closed."); - return Futures.immediateFuture(null); - } + final @Nullable Timeout newRpcTimeout(final RpcTimeout task, final SrpIdNumber requestId) { + final short localTimeout = rpcTimeout; + return localTimeout <= 0 ? null + : dependencies.getTimer().newTimeout(ignored -> task.run(requestId), localTimeout, TimeUnit.SECONDS); + } - for (final TopologySessionListener sessionListener : this.nodes.values()) { - sessionListener.close(); - } - this.nodes.clear(); - for (final TopologyNodeState nodeState : this.state.values()) { - nodeState.close(); - } - this.state.clear(); - final WriteTransaction t = this.broker.newWriteOnlyTransaction(); - t.delete(LogicalDatastoreType.OPERATIONAL, this.topology); - final ListenableFuture future = t.submit(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("Topology {} removed", ServerSessionManager.this.topology); - } + final long updateInterval() { + return isClosed() ? 0 : updateInterval; + } - @Override - public void onFailure(final Throwable t) { - LOG.warn("Failed to remove Topology {}", ServerSessionManager.this.topology, t); - } - }, MoreExecutors.directExecutor()); - return future; + final void setRpcTimeout(final short rpcTimeout) { + this.rpcTimeout = rpcTimeout; } - @Override - public void setPeerSpecificProposal(final InetSocketAddress address, final TlvsBuilder openBuilder) { - requireNonNull(address); - this.peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder); + final void setUpdateInterval(final long updateInterval) { + this.updateInterval = updateInterval; } - short getRpcTimeout() { - return this.rpcTimeout; + final void tearDownSessions(final List outdatedNodes) { + for (var address : outdatedNodes) { + tearDownSession(new TearDownSessionInputBuilder().setNode(createNodeId(address)).build()); + } } - @Override - public synchronized void bind(final KeyedInstanceIdentifier nodeId, - final PcepSessionState sessionState) { - this.statsRegistry.bind(nodeId, sessionState); + final PCEPTopologyProviderDependencies getPCEPTopologyProviderDependencies() { + return dependencies; } - @Override - public synchronized void unbind(final KeyedInstanceIdentifier nodeId) { - this.statsRegistry.unbind(nodeId); + static @NonNull NodeId createNodeId(final InetAddress addr) { + return new NodeId("pcc://" + addr.getHostAddress()); + } + + private @NonNull String topologyId() { + return TopologyUtils.friendlyId(topology); } } \ No newline at end of file