2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.singleton.dom.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.base.Verify.verifyNotNull;
14 import static java.util.Objects.requireNonNull;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.MoreObjects;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.HashMap;
26 import java.util.List;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.checkerframework.checker.lock.qual.GuardedBy;
33 import org.checkerframework.checker.lock.qual.Holding;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
36 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
37 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Implementation of {@link ServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
46 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
49 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
50 * a result on new candidates appearing. We use two entities:
52 * <li>service entity, to which all nodes register</li>
53 * <li>cleanup entity, which only the service entity owner registers to</li>
57 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
58 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
59 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
60 * granted until the old owner finishes the cleanup.
62 final class ActiveServiceGroup extends ServiceGroup {
64 private enum EntityState {
66 * This entity was never registered.
70 * Registration exists, but we are waiting for it to resolve.
74 * Registration indicated we are the owner.
78 * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
79 * another partition, for example.
83 * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
84 * do not need an UNOWNED_JEOPARDY state.
91 * Local service is up and running.
93 // FIXME: we should support async startup, which will require a STARTING state.
96 * Local service is being stopped.
101 private static final Logger LOG = LoggerFactory.getLogger(ActiveServiceGroup.class);
103 private final @NonNull DOMEntityOwnershipService entityOwnershipService;
104 private final @NonNull String identifier;
106 /* Entity instances */
107 private final @NonNull DOMEntity serviceEntity;
108 private final @NonNull DOMEntity cleanupEntity;
110 private final Set<ServiceRegistration> members = ConcurrentHashMap.newKeySet();
112 private final Map<ServiceRegistration, ServiceInfo> services = new HashMap<>();
114 // Marker for when any state changed
115 private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> DIRTY_UPDATER =
116 AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "dirty");
117 private volatile int dirty;
119 // Simplified lock: non-reentrant, support tryLock() only
120 private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> LOCK_UPDATER =
121 AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "lock");
122 @SuppressWarnings("unused")
123 private volatile int lock;
126 * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
127 * - user calling close()
128 * - service entity ownership
129 * - cleanup entity ownership
130 * - service shutdown future
132 * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
133 * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
134 * due to boilerplate overhead.
136 * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
137 * based on recorded bits -- without explicit representation of state machine state.
140 * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
141 * the user has closed the group and we are converging to termination.
143 // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
144 // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
145 // XXX: above needs a microbenchmark contention ever becomes a problem.
146 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
149 * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
150 * acquire {@link #cleanupEntity}.
153 private Registration serviceEntityReg = null;
155 * Service (base) entity last reported state.
158 private EntityState serviceEntityState = EntityState.UNREGISTERED;
161 * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
165 private Registration cleanupEntityReg;
167 * Cleanup (owner) entity last reported state.
170 private EntityState cleanupEntityState = EntityState.UNREGISTERED;
172 private volatile boolean initialized;
175 * Class constructor. Note: last argument is reused as-is.
177 * @param identifier non-empty string as identifier
178 * @param serviceEntity as Entity instance
179 * @param cleanupEntity as Entity instance
180 * @param entityOwnershipService GenericEntityOwnershipService instance
181 * @param parent parent service
182 * @param services Services list
184 ActiveServiceGroup(final String identifier, final DOMEntityOwnershipService entityOwnershipService,
185 final DOMEntity serviceEntity, final DOMEntity cleanupEntity, final List<ServiceRegistration> services) {
186 checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
187 this.identifier = identifier;
188 this.entityOwnershipService = requireNonNull(entityOwnershipService);
189 this.serviceEntity = requireNonNull(serviceEntity);
190 this.cleanupEntity = requireNonNull(cleanupEntity);
191 members.addAll(services);
193 LOG.debug("Instantiated new service group for {}", identifier);
197 ActiveServiceGroup(final String identifier, final DOMEntity serviceEntity,
198 final DOMEntity cleanupEntity, final DOMEntityOwnershipService entityOwnershipService) {
199 this(identifier, entityOwnershipService, serviceEntity, cleanupEntity, ImmutableList.of());
203 public String getIdentifier() {
208 ListenableFuture<?> closeClusterSingletonGroup() {
209 final var ret = destroyGroup();
216 LOG.debug("Service group {} postponing sync on close", identifier);
222 private boolean isClosed() {
223 return closeFuture.get() != null;
227 void initialize() throws CandidateAlreadyRegisteredException {
230 checkState(!initialized, "Singleton group %s was already initilized", identifier);
231 LOG.debug("Initializing service group {} with services {}", identifier, members);
232 synchronized (this) {
233 serviceEntityState = EntityState.REGISTERED;
234 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
242 private void checkNotClosed() {
243 checkState(!isClosed(), "Service group %s has already been closed", identifier);
247 void registerService(final ServiceRegistration reg) {
248 final var service = verifyRegistration(reg);
251 checkState(initialized, "Service group %s is not initialized yet", identifier);
253 // First put the service
254 LOG.debug("Adding service {} to service group {}", service, identifier);
255 verify(members.add(reg));
259 LOG.debug("Service group {} delayed register of {}", identifier, reg);
267 ListenableFuture<?> unregisterService(final ServiceRegistration reg) {
268 verifyRegistration(reg);
271 verify(members.remove(reg));
273 if (members.isEmpty()) {
274 // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
275 // before we start applying state, because while we do not re-enter, the user is free to do whatever,
276 // notably including registering a service with the same ID from the service shutdown hook. That
277 // registration request needs to hit the successor of this group.
278 return destroyGroup();
284 LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
289 private ClusterSingletonService verifyRegistration(final ServiceRegistration reg) {
290 final var service = reg.getInstance();
291 verify(identifier.equals(service.getIdentifier().value()));
295 private synchronized @NonNull ListenableFuture<?> destroyGroup() {
296 final var future = SettableFuture.<Void>create();
297 final var witness = closeFuture.compareAndExchange(null, future);
298 if (witness != null) {
302 if (serviceEntityReg != null) {
303 // We are still holding the service registration, close it now...
304 LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
305 serviceEntityReg.close();
306 serviceEntityReg = null;
314 void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change, final boolean inJeopardy) {
315 synchronized (this) {
316 lockedOwnershipChanged(entity, change, inJeopardy);
321 LOG.debug("Service group {} postponing ownership change sync", identifier);
330 * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
331 * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
333 * @param ownershipChange reported change
336 private void lockedOwnershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
337 final boolean inJeopardy) {
338 if (serviceEntity.equals(entity)) {
339 serviceOwnershipChanged(change, inJeopardy);
341 } else if (cleanupEntity.equals(entity)) {
342 cleanupCandidateOwnershipChanged(change, inJeopardy);
345 LOG.warn("Group {} received unrecognized entity {}", identifier, entity);
350 private void cleanupCandidateOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
352 cleanupEntityState = switch (state) {
353 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> {
354 LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
355 yield EntityState.OWNED_JEOPARDY;
357 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
358 REMOTE_OWNERSHIP_LOST_NO_OWNER -> {
359 LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
360 yield EntityState.UNOWNED;
366 if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
367 // Pair info message with previous jeopardy
368 LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
371 cleanupEntityState = switch (state) {
372 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED;
373 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_LOST_NO_OWNER,
374 REMOTE_OWNERSHIP_CHANGED -> EntityState.UNOWNED;
379 private void serviceOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
381 LOG.info("Service group {} service entity ownership uncertain", identifier);
382 serviceEntityState = switch (state) {
383 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED_JEOPARDY;
384 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
385 REMOTE_OWNERSHIP_LOST_NO_OWNER -> EntityState.UNOWNED;
390 if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
391 // Pair info message with previous jeopardy
392 LOG.info("Service group {} service entity ownership ascertained", identifier);
396 case LOCAL_OWNERSHIP_GRANTED:
397 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
398 LOG.debug("Service group {} acquired service entity ownership", identifier);
399 serviceEntityState = EntityState.OWNED;
401 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
402 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
403 case REMOTE_OWNERSHIP_CHANGED:
404 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
405 LOG.debug("Service group {} lost service entity ownership", identifier);
406 serviceEntityState = EntityState.UNOWNED;
409 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
413 // has to be called with lock asserted, which will be released prior to returning
414 private void reconcileState() {
415 // Always check if there is any state change to be applied.
418 if (conditionalClean()) {
422 // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
425 // - service future completed
426 // - entity state changed
428 // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
429 // be dirty again. This closes the following race condition:
431 // A: runs these checks holding the lock
432 // B: modifies them, fails to acquire lock
433 // A: releases lock -> noone takes care of reconciliation
440 LOG.debug("Service group {} re-running reconciliation", identifier);
444 LOG.debug("Service group {} will be reconciled by someone else", identifier);
446 LOG.debug("Service group {} is completely reconciled", identifier);
453 private void serviceTransitionCompleted() {
460 // Has to be called with lock asserted
461 private void tryReconcileState() {
462 // First take a safe snapshot of current state on which we will base our decisions.
463 final Set<ServiceRegistration> localMembers;
464 final boolean haveCleanup;
465 final boolean haveService;
466 synchronized (this) {
467 if (serviceEntityReg != null) {
468 haveService = switch (serviceEntityState) {
469 case OWNED, OWNED_JEOPARDY -> true;
470 case REGISTERED, UNOWNED, UNREGISTERED -> false;
476 if (haveService && cleanupEntityReg == null) {
477 // We have the service entity but have not registered for cleanup entity. Do that now and retry.
478 LOG.debug("Service group {} registering cleanup entity", identifier);
480 cleanupEntityState = EntityState.REGISTERED;
481 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
482 } catch (CandidateAlreadyRegisteredException e) {
483 LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
484 if (serviceEntityReg != null) {
485 serviceEntityReg.close();
486 serviceEntityReg = null;
493 if (cleanupEntityReg != null) {
494 haveCleanup = switch (cleanupEntityState) {
496 case OWNED_JEOPARDY, REGISTERED, UNOWNED, UNREGISTERED -> false;
502 localMembers = ImmutableSet.copyOf(members);
505 if (haveService && haveCleanup) {
506 ensureServicesStarting(localMembers);
510 ensureServicesStopping();
512 if (!haveService && services.isEmpty()) {
513 LOG.debug("Service group {} has no running services", identifier);
514 final boolean canFinishClose;
515 synchronized (this) {
516 if (cleanupEntityReg != null) {
517 LOG.debug("Service group {} releasing cleanup entity", identifier);
518 cleanupEntityReg.close();
519 cleanupEntityReg = null;
522 canFinishClose = switch (cleanupEntityState) {
523 case OWNED, OWNED_JEOPARDY, REGISTERED -> false;
524 case UNOWNED, UNREGISTERED -> true;
528 if (canFinishClose) {
529 final SettableFuture<Void> localFuture = closeFuture.get();
530 if (localFuture != null && !localFuture.isDone()) {
531 LOG.debug("Service group {} completing termination", identifier);
532 localFuture.set(null);
538 // Has to be called with lock asserted
539 @SuppressWarnings("illegalCatch")
540 private void ensureServicesStarting(final Set<ServiceRegistration> localConfig) {
541 LOG.debug("Service group {} starting services", identifier);
543 // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
544 // example when this method is executed as part of unregisterService() call. In that case we need to ensure
545 // services in the list are stopping
546 final var it = services.entrySet().iterator();
547 while (it.hasNext()) {
548 final var entry = it.next();
549 final var reg = entry.getKey();
550 if (!localConfig.contains(reg)) {
551 final var newInfo = ensureStopping(reg, entry.getValue());
552 if (newInfo != null) {
553 entry.setValue(newInfo);
560 // Now make sure member services are being juggled around
561 for (var reg : localConfig) {
562 if (!services.containsKey(reg)) {
563 final var service = reg.getInstance();
564 LOG.debug("Starting service {}", service);
567 service.instantiateServiceInstance();
568 } catch (Exception e) {
569 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
574 services.put(reg, ServiceInfo.STARTED);
579 // Has to be called with lock asserted
580 private void ensureServicesStopping() {
581 final var it = services.entrySet().iterator();
582 while (it.hasNext()) {
583 final var entry = it.next();
584 final var newInfo = ensureStopping(entry.getKey(), entry.getValue());
585 if (newInfo != null) {
586 entry.setValue(newInfo);
593 @SuppressWarnings("illegalCatch")
594 private ServiceInfo ensureStopping(final ServiceRegistration reg, final ServiceInfo info) {
595 switch (info.getState()) {
597 final var service = reg.getInstance();
599 LOG.debug("Service group {} stopping service {}", identifier, service);
600 final @NonNull ListenableFuture<?> future;
602 future = verifyNotNull(service.closeServiceInstance());
603 } catch (Exception e) {
604 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
609 Futures.addCallback(future, new FutureCallback<Object>() {
611 public void onSuccess(final Object result) {
612 LOG.debug("Service group {} service {} stopped successfully", identifier, service);
613 serviceTransitionCompleted();
617 public void onFailure(final Throwable cause) {
618 LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
619 serviceTransitionCompleted();
621 }, MoreExecutors.directExecutor());
622 return info.toState(ServiceState.STOPPING, future);
624 if (info.getFuture().isDone()) {
625 LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
630 throw new IllegalStateException("Unhandled state " + info.getState());
634 private void markDirty() {
638 private boolean isDirty() {
642 private boolean conditionalClean() {
643 return DIRTY_UPDATER.compareAndSet(this, 1, 0);
646 private boolean tryLock() {
647 return LOCK_UPDATER.compareAndSet(this, 0, 1);
650 private boolean unlock() {
651 verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
656 public String toString() {
657 return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();