2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.singleton.dom.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.base.Verify.verifyNotNull;
14 import static java.util.Objects.requireNonNull;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.MoreObjects;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.Collection;
26 import java.util.HashMap;
27 import java.util.Iterator;
29 import java.util.Map.Entry;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33 import java.util.concurrent.atomic.AtomicReference;
34 import org.checkerframework.checker.lock.qual.GuardedBy;
35 import org.checkerframework.checker.lock.qual.Holding;
36 import org.eclipse.jdt.annotation.NonNull;
37 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
38 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
39 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
40 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
41 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
44 import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
45 import org.opendaylight.yangtools.concepts.Registration;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
51 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
54 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
55 * a result on new candidates appearing. We use two entities:
56 * - service entity, to which all nodes register
57 * - cleanup entity, which only the service entity owner registers to
60 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
61 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
62 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
63 * granted until the old owner finishes the cleanup.
65 * @param <P> the instance identifier path type
66 * @param <E> the GenericEntity type
67 * @param <C> the GenericEntityOwnershipChange type
68 * @param <G> the GenericEntityOwnershipListener type
69 * @param <S> the GenericEntityOwnershipService type
71 final class ClusterSingletonServiceGroupImpl<P extends HierarchicalIdentifier<P>, E extends GenericEntity<P>,
72 G extends GenericEntityOwnershipListener<E>, S extends GenericEntityOwnershipService<E, G>>
73 extends ClusterSingletonServiceGroup<P, E> {
75 private enum EntityState {
77 * This entity was never registered.
81 * Registration exists, but we are waiting for it to resolve.
85 * Registration indicated we are the owner.
89 * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
90 * another partition, for example.
94 * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
95 * do not need an UNOWNED_JEOPARDY state.
102 * Local service is up and running.
104 // FIXME: we should support async startup, which will require a STARTING state.
107 * Local service is being stopped.
112 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
114 private final S entityOwnershipService;
115 private final String identifier;
117 /* Entity instances */
118 private final @NonNull E serviceEntity;
119 private final @NonNull E cleanupEntity;
121 private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
123 private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
125 // Marker for when any state changed
126 @SuppressWarnings("rawtypes")
127 private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
128 AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
129 private volatile int dirty;
131 // Simplified lock: non-reentrant, support tryLock() only
132 @SuppressWarnings("rawtypes")
133 private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
134 AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
135 @SuppressWarnings("unused")
136 private volatile int lock;
139 * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
140 * - user calling close()
141 * - service entity ownership
142 * - cleanup entity ownership
143 * - service shutdown future
145 * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
146 * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
147 * due to boilerplate overhead.
149 * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
150 * based on recorded bits -- without explicit representation of state machine state.
153 * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
154 * the user has closed the group and we are converging to termination.
156 // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
157 // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
158 // XXX: above needs a microbenchmark contention ever becomes a problem.
159 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
162 * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
163 * acquire {@link #cleanupEntity}.
166 private Registration serviceEntityReg = null;
168 * Service (base) entity last reported state.
171 private EntityState serviceEntityState = EntityState.UNREGISTERED;
174 * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
178 private Registration cleanupEntityReg;
180 * Cleanup (owner) entity last reported state.
183 private EntityState cleanupEntityState = EntityState.UNREGISTERED;
185 private volatile boolean initialized;
188 * Class constructor. Note: last argument is reused as-is.
190 * @param identifier non-empty string as identifier
191 * @param mainEntity as Entity instance
192 * @param closeEntity as Entity instance
193 * @param entityOwnershipService GenericEntityOwnershipService instance
194 * @param parent parent service
195 * @param services Services list
197 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
198 final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
199 checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
200 this.identifier = identifier;
201 this.entityOwnershipService = requireNonNull(entityOwnershipService);
202 serviceEntity = requireNonNull(mainEntity);
203 cleanupEntity = requireNonNull(closeEntity);
204 members.addAll(services);
206 LOG.debug("Instantiated new service group for {}", identifier);
210 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
211 final E closeEntity, final S entityOwnershipService) {
212 this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
216 public String getIdentifier() {
221 ListenableFuture<?> closeClusterSingletonGroup() {
222 final ListenableFuture<?> ret = destroyGroup();
229 LOG.debug("Service group {} postponing sync on close", identifier);
235 private boolean isClosed() {
236 return closeFuture.get() != null;
240 void initialize() throws CandidateAlreadyRegisteredException {
243 checkState(!initialized, "Singleton group %s was already initilized", identifier);
244 LOG.debug("Initializing service group {} with services {}", identifier, members);
245 synchronized (this) {
246 serviceEntityState = EntityState.REGISTERED;
247 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
255 private void checkNotClosed() {
256 checkState(!isClosed(), "Service group %s has already been closed", identifier);
260 void registerService(final ClusterSingletonServiceRegistration reg) {
261 final ClusterSingletonService service = verifyRegistration(reg);
264 checkState(initialized, "Service group %s is not initialized yet", identifier);
266 // First put the service
267 LOG.debug("Adding service {} to service group {}", service, identifier);
268 verify(members.add(reg));
272 LOG.debug("Service group {} delayed register of {}", identifier, reg);
280 ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
281 verifyRegistration(reg);
284 verify(members.remove(reg));
286 if (members.isEmpty()) {
287 // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
288 // before we start applying state, because while we do not re-enter, the user is free to do whatever,
289 // notably including registering a service with the same ID from the service shutdown hook. That
290 // registration request needs to hit the successor of this group.
291 return destroyGroup();
297 LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
302 private ClusterSingletonService verifyRegistration(final ClusterSingletonServiceRegistration reg) {
303 final ClusterSingletonService service = reg.getInstance();
304 verify(identifier.equals(service.getIdentifier().getName()));
308 private synchronized @NonNull ListenableFuture<?> destroyGroup() {
309 final SettableFuture<Void> future = SettableFuture.create();
310 if (!closeFuture.compareAndSet(null, future)) {
311 return verifyNotNull(closeFuture.get());
314 if (serviceEntityReg != null) {
315 // We are still holding the service registration, close it now...
316 LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
317 serviceEntityReg.close();
318 serviceEntityReg = null;
326 void ownershipChanged(final E entity, final EntityOwnershipStateChange change, final boolean inJeopardy) {
327 synchronized (this) {
328 lockedOwnershipChanged(entity, change, inJeopardy);
333 LOG.debug("Service group {} postponing ownership change sync", identifier);
342 * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
343 * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
345 * @param ownershipChange reported change
348 private void lockedOwnershipChanged(final E entity, final EntityOwnershipStateChange change,
349 final boolean inJeopardy) {
350 if (serviceEntity.equals(entity)) {
351 serviceOwnershipChanged(change, inJeopardy);
353 } else if (cleanupEntity.equals(entity)) {
354 cleanupCandidateOwnershipChanged(change, inJeopardy);
357 LOG.warn("Group {} received unrecognized entity {}", identifier, entity);
362 private void cleanupCandidateOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
364 cleanupEntityState = switch (state) {
365 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> {
366 LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
367 yield EntityState.OWNED_JEOPARDY;
369 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
370 REMOTE_OWNERSHIP_LOST_NO_OWNER -> {
371 LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
372 yield EntityState.UNOWNED;
378 if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
379 // Pair info message with previous jeopardy
380 LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
383 cleanupEntityState = switch (state) {
384 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED;
385 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_LOST_NO_OWNER,
386 REMOTE_OWNERSHIP_CHANGED -> EntityState.UNOWNED;
391 private void serviceOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
393 LOG.info("Service group {} service entity ownership uncertain", identifier);
394 serviceEntityState = switch (state) {
395 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED_JEOPARDY;
396 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
397 REMOTE_OWNERSHIP_LOST_NO_OWNER -> EntityState.UNOWNED;
402 if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
403 // Pair info message with previous jeopardy
404 LOG.info("Service group {} service entity ownership ascertained", identifier);
408 case LOCAL_OWNERSHIP_GRANTED:
409 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
410 LOG.debug("Service group {} acquired service entity ownership", identifier);
411 serviceEntityState = EntityState.OWNED;
413 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
414 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
415 case REMOTE_OWNERSHIP_CHANGED:
416 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
417 LOG.debug("Service group {} lost service entity ownership", identifier);
418 serviceEntityState = EntityState.UNOWNED;
421 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
425 // has to be called with lock asserted, which will be released prior to returning
426 private void reconcileState() {
427 // Always check if there is any state change to be applied.
430 if (conditionalClean()) {
434 // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
437 // - service future completed
438 // - entity state changed
440 // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
441 // be dirty again. This closes the following race condition:
443 // A: runs these checks holding the lock
444 // B: modifies them, fails to acquire lock
445 // A: releases lock -> noone takes care of reconciliation
452 LOG.debug("Service group {} re-running reconciliation", identifier);
456 LOG.debug("Service group {} will be reconciled by someone else", identifier);
458 LOG.debug("Service group {} is completely reconciled", identifier);
465 private void serviceTransitionCompleted() {
472 // Has to be called with lock asserted
473 private void tryReconcileState() {
474 // First take a safe snapshot of current state on which we will base our decisions.
475 final Set<ClusterSingletonServiceRegistration> localMembers;
476 final boolean haveCleanup;
477 final boolean haveService;
478 synchronized (this) {
479 if (serviceEntityReg != null) {
480 haveService = switch (serviceEntityState) {
481 case OWNED, OWNED_JEOPARDY -> true;
482 case REGISTERED, UNOWNED, UNREGISTERED -> false;
488 if (haveService && cleanupEntityReg == null) {
489 // We have the service entity but have not registered for cleanup entity. Do that now and retry.
490 LOG.debug("Service group {} registering cleanup entity", identifier);
492 cleanupEntityState = EntityState.REGISTERED;
493 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
494 } catch (CandidateAlreadyRegisteredException e) {
495 LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
496 if (serviceEntityReg != null) {
497 serviceEntityReg.close();
498 serviceEntityReg = null;
505 if (cleanupEntityReg != null) {
506 haveCleanup = switch (cleanupEntityState) {
508 case OWNED_JEOPARDY, REGISTERED, UNOWNED, UNREGISTERED -> false;
514 localMembers = ImmutableSet.copyOf(members);
517 if (haveService && haveCleanup) {
518 ensureServicesStarting(localMembers);
522 ensureServicesStopping();
524 if (!haveService && services.isEmpty()) {
525 LOG.debug("Service group {} has no running services", identifier);
526 final boolean canFinishClose;
527 synchronized (this) {
528 if (cleanupEntityReg != null) {
529 LOG.debug("Service group {} releasing cleanup entity", identifier);
530 cleanupEntityReg.close();
531 cleanupEntityReg = null;
534 canFinishClose = switch (cleanupEntityState) {
535 case OWNED, OWNED_JEOPARDY, REGISTERED -> false;
536 case UNOWNED, UNREGISTERED -> true;
540 if (canFinishClose) {
541 final SettableFuture<Void> localFuture = closeFuture.get();
542 if (localFuture != null && !localFuture.isDone()) {
543 LOG.debug("Service group {} completing termination", identifier);
544 localFuture.set(null);
550 // Has to be called with lock asserted
551 @SuppressWarnings("illegalCatch")
552 private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
553 LOG.debug("Service group {} starting services", identifier);
555 // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
556 // example when this method is executed as part of unregisterService() call. In that case we need to ensure
557 // services in the list are stopping
558 final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
559 while (it.hasNext()) {
560 final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
561 final ClusterSingletonServiceRegistration reg = entry.getKey();
562 if (!localConfig.contains(reg)) {
563 final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
564 if (newInfo != null) {
565 entry.setValue(newInfo);
572 // Now make sure member services are being juggled around
573 for (ClusterSingletonServiceRegistration reg : localConfig) {
574 if (!services.containsKey(reg)) {
575 final ClusterSingletonService service = reg.getInstance();
576 LOG.debug("Starting service {}", service);
579 service.instantiateServiceInstance();
580 } catch (Exception e) {
581 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
586 services.put(reg, ServiceInfo.started());
591 // Has to be called with lock asserted
592 private void ensureServicesStopping() {
593 final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
594 while (it.hasNext()) {
595 final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
596 final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
597 if (newInfo != null) {
598 entry.setValue(newInfo);
605 @SuppressWarnings("illegalCatch")
606 private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
607 switch (info.getState()) {
609 final ClusterSingletonService service = reg.getInstance();
611 LOG.debug("Service group {} stopping service {}", identifier, service);
612 final @NonNull ListenableFuture<?> future;
614 future = verifyNotNull(service.closeServiceInstance());
615 } catch (Exception e) {
616 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
621 Futures.addCallback(future, new FutureCallback<Object>() {
623 public void onSuccess(final Object result) {
624 LOG.debug("Service group {} service {} stopped successfully", identifier, service);
625 serviceTransitionCompleted();
629 public void onFailure(final Throwable cause) {
630 LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
631 serviceTransitionCompleted();
633 }, MoreExecutors.directExecutor());
634 return info.toState(ServiceState.STOPPING, future);
636 if (info.getFuture().isDone()) {
637 LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
642 throw new IllegalStateException("Unhandled state " + info.getState());
646 private void markDirty() {
650 private boolean isDirty() {
654 private boolean conditionalClean() {
655 return DIRTY_UPDATER.compareAndSet(this, 1, 0);
658 private boolean tryLock() {
659 return LOCK_UPDATER.compareAndSet(this, 0, 1);
662 private boolean unlock() {
663 verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
668 public String toString() {
669 return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();