2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.concurrent.ExecutionException;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.checkerframework.checker.lock.qual.Holding;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
23 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
24 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
25 import org.opendaylight.yangtools.concepts.Registration;
26 import org.opendaylight.yangtools.yang.common.Empty;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Glue between asynchronous lifecycle driven by {@link PCEPTopologyTracker} and asynchronous instantiation of a
32 * {@link ClusterSingletonService} for a particular topology.
34 final class PCEPTopologySingleton {
35 // Common class for possible states
36 private abstract static class State {
40 // The state for alive-and-kickin'. We are registered for cluster-wide instantiation
41 private final class Active extends State implements ClusterSingletonService {
42 private final InstructionScheduler scheduler;
45 private SettableFuture<Empty> closeFuture;
47 private PCEPTopologyInstance instance;
49 private Registration reg;
52 scheduler = tracker.instructionSchedulerFactory.createInstructionScheduler(
53 topology.getTopologyId().getValue());
54 reg = tracker.singletonService.registerClusterSingletonService(this);
58 public ServiceGroupIdentifier getIdentifier() {
59 return scheduler.getIdentifier();
63 public synchronized void instantiateServiceInstance() {
65 LOG.trace("Topology {} instance {} is closed, instantiation skipped", topologyId(),
66 PCEPTopologySingleton.this);
70 LOG.trace("Topology {} instance {} instantiating", topologyId(), PCEPTopologySingleton.this);
71 instance = new PCEPTopologyInstance(topology, tracker, scheduler);
75 public ListenableFuture<?> closeServiceInstance() {
76 // First adjust our state under the lock...
77 final SettableFuture<Empty> close;
78 final ListenableFuture<?> ret;
81 LOG.trace("Topology {} instance {} closing", topologyId(), PCEPTopologySingleton.this);
82 if (closeFuture == null) {
83 close = closeFuture = SettableFuture.create();
88 ret = verifyNotNull(instance).terminate();
92 // ... and then add completion callback. Order of operations is significant, as we want to update our state
93 // before we give anybody a chance to react to closeFuture.
94 ret.addListener(() -> {
95 LOG.trace("Topology {} instance {} completing close", topologyId(), PCEPTopologySingleton.this);
99 LOG.trace("Topology {} instance {} closed", topologyId(), PCEPTopologySingleton.this);
100 close.set(Empty.value());
101 }, MoreExecutors.directExecutor());
105 ListenableFuture<?> terminate() {
106 // Acquire the service termination future and shut down the scheduler once it completes
107 final var future = lockedTerminate();
108 future.addListener(scheduler::close, MoreExecutors.directExecutor());
112 // This part of the shutdown procedure needs to synchronize with instantiation and closure
113 private synchronized ListenableFuture<?> lockedTerminate() {
114 verifyNotNull(reg, "Topology %s instance %s already terminating", topologyId(), PCEPTopologySingleton.this);
116 final ListenableFuture<?> ret;
117 if (closeFuture == null) {
118 // Service is not being closed, we need to create a future...
119 ret = closeFuture = SettableFuture.create();
120 if (instance == null) {
121 // ... and there is no instance, hence we need to also compete it immediate
122 closeFuture.set(Empty.value());
125 // Service close is already going on, reuse that future
129 // Close the registration, potentially triggering closeServiceInstance(), which may even run on this thread
136 // Instance terminating, we never get out of this state
137 private static final class Terminating extends State {
138 final @NonNull ListenableFuture<?> future;
140 Terminating(final ListenableFuture<?> future) {
141 this.future = requireNonNull(future);
145 // Instance is waiting for a previous incarnation to finish terminating
146 private static final class Waiting extends State {
147 final @NonNull ListenableFuture<?> future;
149 Waiting(final ListenableFuture<?> future) {
150 this.future = requireNonNull(future);
154 private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologySingleton.class);
156 private final PCEPTopologyTracker tracker;
157 private final TopologyKey topology;
162 private SettableFuture<?> cleanupFuture;
164 private PCEPTopologySingleton(final PCEPTopologySingleton previous, final ListenableFuture<?> future) {
165 tracker = previous.tracker;
166 topology = previous.topology;
167 state = new Waiting(future);
168 future.addListener(this::becomeActive, MoreExecutors.directExecutor());
171 PCEPTopologySingleton(final PCEPTopologyTracker tracker, final TopologyKey topology) {
172 this.tracker = requireNonNull(tracker);
173 this.topology = requireNonNull(topology);
174 state = new Active();
177 synchronized void destroy() {
178 if (state instanceof Active) {
179 LOG.trace("Starting destruction of topology {} instance {}", topologyId(), this);
180 becomeTerminating(((Active) state).terminate());
181 } else if (state instanceof Waiting) {
182 LOG.trace("Topology {} instance {} destroyed while waiting", topologyId(), this);
183 becomeTerminating(((Waiting) state).future);
185 verify(state instanceof Terminating, "Unexpected state %s", state);
186 LOG.trace("Topology {} instance {} is already being destroyed", topologyId(), this);
190 @NonNull PCEPTopologySingleton resurrect() {
191 return new PCEPTopologySingleton(this, acquireCleanup());
194 void awaitCleanup() {
196 acquireCleanup().get();
197 } catch (InterruptedException e) {
198 LOG.info("Interrupted while waiting for topology {} cleanup", topologyId(), e);
199 Thread.currentThread().interrupt();
200 } catch (ExecutionException e) {
201 LOG.error("Topology {} cleanup failed", topologyId(), e);
205 private synchronized @NonNull ListenableFuture<?> acquireCleanup() {
206 return verifyTerminating().future;
209 private synchronized void becomeActive() {
210 if (state instanceof Waiting) {
211 LOG.trace("Topology {} instance {} becoming active", topologyId(), this);
212 state = new Active();
215 LOG.trace("Skipping activation of terminated topology {} instance {}", topologyId(), this);
220 private void becomeTerminating(final ListenableFuture<?> future) {
221 state = new Terminating(future);
222 future.addListener(() -> tracker.finishDestroy(topology, this), MoreExecutors.directExecutor());
226 private Terminating verifyTerminating() {
227 verify(state instanceof Terminating, "Unexpected topology %s instance %s state %s", topologyId(), this, state);
228 return (Terminating) state;
231 private String topologyId() {
232 return TopologyUtils.friendlyId(topology);