2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.pcep.topology.provider;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.concurrent.ExecutionException;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.checkerframework.checker.lock.qual.Holding;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
23 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
24 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
25 import org.opendaylight.yangtools.concepts.Registration;
26 import org.opendaylight.yangtools.yang.common.Empty;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Glue between asynchronous lifecycle driven by {@link PCEPTopologyTracker} and asynchronous instantiation of a
32 * {@link ClusterSingletonService} for a particular topology.
34 final class PCEPTopologySingleton {
35 // Common class for possible states
36 private abstract static class State {
40 // The state for alive-and-kickin'. We are registered for cluster-wide instantiation
41 private final class Active extends State implements ClusterSingletonService {
42 private final InstructionScheduler scheduler;
45 private SettableFuture<Empty> closeFuture;
47 private PCEPTopologyInstance instance;
49 private Registration reg;
52 scheduler = tracker.instructionSchedulerFactory.createInstructionScheduler(
53 topology.getTopologyId().getValue());
54 reg = tracker.singletonService.registerClusterSingletonService(this);
58 public ServiceGroupIdentifier getIdentifier() {
59 return scheduler.getIdentifier();
63 public synchronized void instantiateServiceInstance() {
65 LOG.trace("Topology {} instance {} is closed, instantiation skipped", topologyId(),
66 PCEPTopologySingleton.this);
70 LOG.trace("Topology {} instance {} instantiating", topologyId(), PCEPTopologySingleton.this);
71 instance = new PCEPTopologyInstance(topology, tracker, scheduler);
75 public ListenableFuture<?> closeServiceInstance() {
76 // First adjust our state under the lock...
77 final SettableFuture<Empty> close;
78 final ListenableFuture<?> ret;
81 LOG.trace("Topology {} instance {} closing", topologyId(), PCEPTopologySingleton.this);
82 if (closeFuture == null) {
83 close = closeFuture = SettableFuture.create();
88 ret = verifyNotNull(instance).terminate();
92 // ... and then add completion callback. Order of operations is significant, as we want to update our state
93 // before we give anybody a chance to react to closeFuture.
94 ret.addListener(() -> {
95 LOG.trace("Topology {} instance {} completing close", topologyId(), PCEPTopologySingleton.this);
99 LOG.trace("Topology {} instance {} closed", topologyId(), PCEPTopologySingleton.this);
100 close.set(Empty.value());
101 }, MoreExecutors.directExecutor());
105 ListenableFuture<?> terminate() {
106 // Acquire the service termination future and shut down the scheduler once it completes
107 final var future = lockedTerminate();
108 future.addListener(scheduler::close, MoreExecutors.directExecutor());
112 // This part of the shutdown procedure needs to synchronize with instantiation and closure
113 private synchronized ListenableFuture<?> lockedTerminate() {
114 verifyNotNull(reg, "Topology %s instance %s already terminating", topologyId(), PCEPTopologySingleton.this);
116 final ListenableFuture<?> ret;
117 if (closeFuture == null) {
118 // Service is not being closed, we need to create a future...
119 ret = closeFuture = SettableFuture.create();
120 if (instance == null) {
121 // ... and there is no instance, hence we need to also compete it immediate
122 closeFuture.set(Empty.value());
125 // Service close is already going on, reuse that future
129 // Close the registration, potentially triggering closeServiceInstance(), which may even run on this thread
136 // Instance terminating, we never get out of this state
137 private static final class Terminating extends State {
138 final @NonNull ListenableFuture<?> future;
140 Terminating(final ListenableFuture<?> future) {
141 this.future = requireNonNull(future);
145 // Instance is waiting for a previous incarnation to finish terminating
146 private static final class Waiting extends State {
147 final @NonNull ListenableFuture<?> future;
149 Waiting(final ListenableFuture<?> future) {
150 this.future = requireNonNull(future);
154 private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologySingleton.class);
156 private final PCEPTopologyTracker tracker;
157 private final TopologyKey topology;
162 private PCEPTopologySingleton(final PCEPTopologySingleton previous, final ListenableFuture<?> future) {
163 tracker = previous.tracker;
164 topology = previous.topology;
165 state = new Waiting(future);
166 future.addListener(this::becomeActive, MoreExecutors.directExecutor());
169 PCEPTopologySingleton(final PCEPTopologyTracker tracker, final TopologyKey topology) {
170 this.tracker = requireNonNull(tracker);
171 this.topology = requireNonNull(topology);
172 state = new Active();
175 synchronized void destroy() {
176 if (state instanceof Active) {
177 LOG.trace("Starting destruction of topology {} instance {}", topologyId(), this);
178 becomeTerminating(((Active) state).terminate());
179 } else if (state instanceof Waiting) {
180 LOG.trace("Topology {} instance {} destroyed while waiting", topologyId(), this);
181 becomeTerminating(((Waiting) state).future);
183 verify(state instanceof Terminating, "Unexpected state %s", state);
184 LOG.trace("Topology {} instance {} is already being destroyed", topologyId(), this);
188 @NonNull PCEPTopologySingleton resurrect() {
189 return new PCEPTopologySingleton(this, acquireCleanup());
192 void awaitCleanup() {
194 acquireCleanup().get();
195 } catch (InterruptedException e) {
196 LOG.info("Interrupted while waiting for topology {} cleanup", topologyId(), e);
197 Thread.currentThread().interrupt();
198 } catch (ExecutionException e) {
199 LOG.error("Topology {} cleanup failed", topologyId(), e);
203 private synchronized @NonNull ListenableFuture<?> acquireCleanup() {
204 return verifyTerminating().future;
207 private synchronized void becomeActive() {
208 if (state instanceof Waiting) {
209 LOG.trace("Topology {} instance {} becoming active", topologyId(), this);
210 state = new Active();
213 LOG.trace("Skipping activation of terminated topology {} instance {}", topologyId(), this);
218 private void becomeTerminating(final ListenableFuture<?> future) {
219 state = new Terminating(future);
220 future.addListener(() -> tracker.finishDestroy(topology, this), MoreExecutors.directExecutor());
224 private Terminating verifyTerminating() {
225 verify(state instanceof Terminating, "Unexpected topology %s instance %s state %s", topologyId(), this, state);
226 return (Terminating) state;
229 private String topologyId() {
230 return TopologyUtils.friendlyId(topology);