Adopt odlparent-10.0.0/yangtools-8.0.0-SNAPSHOT
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
index 82897e6c60366571ee430dd1e675b93194ca9fa8..8a283545b6f7c58152365b546c4183c4dcc538c1 100644 (file)
@@ -5,36 +5,50 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.mdsal.singleton.dom.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.CheckReturnValue;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
+ * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
  * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
  *
  * <p>
@@ -55,46 +69,45 @@ import org.slf4j.LoggerFactory;
  * @param <G> the GenericEntityOwnershipListener type
  * @param <S> the GenericEntityOwnershipService type
  */
-final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
+final class ClusterSingletonServiceGroupImpl<P extends HierarchicalIdentifier<P>, E extends GenericEntity<P>,
         C extends GenericEntityOwnershipChange<P, E>,  G extends GenericEntityOwnershipListener<P, C>,
         S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
-    private enum State {
+
+    private enum EntityState {
         /**
-         * This group has been freshly allocated and has not been started yet.
+         * This entity was never registered.
          */
-        INITIAL,
+        UNREGISTERED,
         /**
-         * Operational state. Service entity is registered, but ownership was not resolved yet.
+         * Registration exists, but we are waiting for it to resolve.
          */
         REGISTERED,
         /**
-         * Operational state. Service entity confirmed to be follower.
-         */
-        STANDBY,
-        /**
-         * Service entity acquired. Attempting to acquire cleanup entity.
-         */
-        TAKING_OWNERSHIP,
-        /**
-         * Both entities held and user services are being started.
+         * Registration indicated we are the owner.
          */
-        STARTING_SERVICES,
+        OWNED,
         /**
-         * Steady state. Both entities held and services have finished starting.
+         * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
+         * another partition, for example.
          */
-        OWNER,
+        OWNED_JEOPARDY,
         /**
-         * User services are being stopped due to either loss of an entity or a shutdown.
+         * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
+         * do not need an UNOWNED_JEOPARDY state.
          */
-        STOPPING_SERVICES,
+        UNOWNED,
+    }
+
+    enum ServiceState {
         /**
-         * We have stopped services and are now relinquishing the cleanup entity.
+         * Local service is up and running.
          */
-        RELEASING_OWNERSHIP,
+        // FIXME: we should support async startup, which will require a STARTING state.
+        STARTED,
         /**
-         * Terminated, this group cannot be used anymore.
+         * Local service is being stopped.
          */
-        TERMINATED
+        STOPPING,
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
@@ -106,23 +119,71 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
     private final E serviceEntity;
     private final E cleanupEntity;
 
-    private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
-    private final ReentrantLock lock = new ReentrantLock(true);
+    private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
+    // Guarded by lock
+    private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
 
-    @GuardedBy("lock")
-    private final List<ClusterSingletonService> serviceGroup;
+    // Marker for when any state changed
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
+    private volatile int dirty;
 
-    @GuardedBy("lock")
-    private State state = State.INITIAL;
+    // Simplified lock: non-reentrant, support tryLock() only
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
+    @SuppressWarnings("unused")
+    private volatile int lock;
 
-    @GuardedBy("lock")
-    private List<C> capture;
+    /*
+     * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
+     * - user calling close()
+     * - service entity ownership
+     * - cleanup entity ownership
+     * - service shutdown future
+     *
+     * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
+     * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
+     * due to boilerplate overhead.
+     *
+     * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
+     * based on recorded bits -- without explicit representation of state machine state.
+     */
+    /**
+     * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
+     * the user has closed the group and we are converging to termination.
+     */
+    // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
+    // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
+    // XXX: above needs a microbenchmark contention ever becomes a problem.
+    private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
+
+    /**
+     * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
+     * acquire {@link #cleanupEntity}.
+     */
+    @GuardedBy("this")
+    private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
+    /**
+     * Service (base) entity last reported state.
+     */
+    @GuardedBy("this")
+    private EntityState serviceEntityState = EntityState.UNREGISTERED;
 
-    /* EOS Candidate Registrations */
-    @GuardedBy("lock")
-    private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
-    @GuardedBy("lock")
+    /**
+     * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
+     * and startup.
+     */
+    @GuardedBy("this")
     private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
+    /**
+     * Cleanup (owner) entity last reported state.
+     */
+    @GuardedBy("this")
+    private EntityState cleanupEntityState = EntityState.UNREGISTERED;
+
+    private volatile boolean initialized;
 
     /**
      * Class constructor. Note: last argument is reused as-is.
@@ -135,20 +196,21 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      * @param services Services list
      */
     ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
-            final E closeEntity, final List<ClusterSingletonService> services) {
-        Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+            final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
+        checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
         this.identifier = identifier;
-        this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
-        this.serviceEntity = Preconditions.checkNotNull(mainEntity);
-        this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
-        this.serviceGroup = Preconditions.checkNotNull(services);
+        this.entityOwnershipService = requireNonNull(entityOwnershipService);
+        this.serviceEntity = requireNonNull(mainEntity);
+        this.cleanupEntity = requireNonNull(closeEntity);
+        members.addAll(services);
+
         LOG.debug("Instantiated new service group for {}", identifier);
     }
 
     @VisibleForTesting
     ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
             final E closeEntity, final S entityOwnershipService) {
-        this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+        this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
     }
 
     @Override
@@ -158,403 +220,510 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
 
     @Override
     ListenableFuture<?> closeClusterSingletonGroup() {
-        // Assert our future first
-        final SettableFuture<Void> future = SettableFuture.create();
-        final SettableFuture<Void> existing = closeFuture.getAndSet(future);
-        if (existing != null) {
-            return existing;
-        }
-
-        if (!lock.tryLock()) {
-            // The lock is held, the cleanup will be finished by the owner thread
-            LOG.debug("Singleton group {} cleanup postponed", identifier);
-            return future;
-        }
+        final ListenableFuture<?> ret = destroyGroup();
+        members.clear();
+        markDirty();
 
-        try {
-            lockedClose(future);
-        } finally {
-            lock.unlock();
+        if (tryLock()) {
+            reconcileState();
+        } else {
+            LOG.debug("Service group {} postponing sync on close", identifier);
         }
 
-        LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
-        return future;
+        return ret;
     }
 
     private boolean isClosed() {
         return closeFuture.get() != null;
     }
 
-    @GuardedBy("lock")
-    private void updateState(final State newState) {
-        LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
-        state = Verify.verifyNotNull(newState);
-    }
-
-    @GuardedBy("lock")
-    private void lockedClose(final SettableFuture<Void> future) {
-        if (serviceEntityReg != null) {
-            LOG.debug("Service group {} unregistering", identifier);
-            serviceEntityReg.close();
-            serviceEntityReg = null;
-        }
-
-        switch (state) {
-            case INITIAL:
-                // Not started: not much to do
-                terminate(future);
-                break;
-            case TERMINATED:
-                // Already done: no-op
-                break;
-            case REGISTERED:
-            case STANDBY:
-                LOG.debug("Service group {} terminated", identifier);
-                terminate(future);
-                break;
-            case OWNER:
-                // No-op, we will react to the loss of registration instead.
-                break;
-            case STOPPING_SERVICES:
-                // Waiting for services. Will resume once we get notified.
-                break;
-            case RELEASING_OWNERSHIP:
-                // Waiting for cleanup entity to flip, will resume afterwards.
-                break;
-            case TAKING_OWNERSHIP:
-                // Abort taking of ownership and close
-                LOG.debug("Service group {} aborting ownership bid", identifier);
-                cleanupEntityReg.close();
-                cleanupEntityReg = null;
-                updateState(State.RELEASING_OWNERSHIP);
-                break;
-            default:
-                throw new IllegalStateException("Unhandled state " + state);
-        }
-    }
-
-    @GuardedBy("lock")
-    private void terminate(final SettableFuture<Void> future) {
-        updateState(State.TERMINATED);
-        Verify.verify(future.set(null));
-    }
-
     @Override
     void initialize() throws CandidateAlreadyRegisteredException {
-        LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
-
-        lock.lock();
+        verify(tryLock());
         try {
-            Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
-                    state);
-
-            // Catch events if they fire during this call
-            capture = new ArrayList<>(0);
-            serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
-            state = State.REGISTERED;
-
-            final List<C> captured = capture;
-            capture = null;
-            captured.forEach(this::lockedOwnershipChanged);
+            checkState(!initialized, "Singleton group %s was already initilized", identifier);
+            LOG.debug("Initializing service group {} with services {}", identifier, members);
+            synchronized (this) {
+                serviceEntityState = EntityState.REGISTERED;
+                serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+                initialized = true;
+            }
         } finally {
-            lock.unlock();
+            unlock();
         }
     }
 
     private void checkNotClosed() {
-        Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
-                identifier);
+        checkState(!isClosed(), "Service group %s has already been closed", identifier);
     }
 
     @Override
-    void registerService(final ClusterSingletonService service) {
-        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+    void registerService(final ClusterSingletonServiceRegistration reg) {
+        final ClusterSingletonService service = verifyRegistration(reg);
         checkNotClosed();
 
-        LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
+        checkState(initialized, "Service group %s is not initialized yet", identifier);
 
-        lock.lock();
-        try {
-            Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
-            serviceGroup.add(service);
+        // First put the service
+        LOG.debug("Adding service {} to service group {}", service, identifier);
+        verify(members.add(reg));
+        markDirty();
 
-            switch (state) {
-                case OWNER:
-                case STARTING_SERVICES:
-                    service.instantiateServiceInstance();
-                    break;
-                default:
-                    break;
-            }
-        } finally {
-            lock.unlock();
+        if (!tryLock()) {
+            LOG.debug("Service group {} delayed register of {}", identifier, reg);
+            return;
         }
+
+        reconcileState();
     }
 
-    @CheckReturnValue
     @Override
-    boolean unregisterService(final ClusterSingletonService service) {
-        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+    ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
+        verifyRegistration(reg);
         checkNotClosed();
 
-        lock.lock();
-        try {
-            // There is a slight problem here, as the type does not match the list type, hence we need to tread
-            // carefully.
-            if (serviceGroup.size() == 1) {
-                Verify.verify(serviceGroup.contains(service));
-                return true;
-            }
+        verify(members.remove(reg));
+        markDirty();
+        if (members.isEmpty()) {
+            // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
+            // before we start applying state, because while we do not re-enter, the user is free to do whatever,
+            // notably including registering a service with the same ID from the service shutdown hook. That
+            // registration request needs to hit the successor of this group.
+            return destroyGroup();
+        }
 
-            Verify.verify(serviceGroup.remove(service));
-            LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+        if (tryLock()) {
+            reconcileState();
+        } else {
+            LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
+        }
+        return null;
+    }
 
-            switch (state) {
-                case OWNER:
-                case STARTING_SERVICES:
-                    service.closeServiceInstance();
-                    break;
-                default:
-                    break;
-            }
+    private ClusterSingletonService verifyRegistration(final ClusterSingletonServiceRegistration reg) {
+        final ClusterSingletonService service = reg.getInstance();
+        verify(identifier.equals(service.getIdentifier().getName()));
+        return service;
+    }
 
-            return false;
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
+    private synchronized @NonNull ListenableFuture<?> destroyGroup() {
+        final SettableFuture<Void> future = SettableFuture.create();
+        if (!closeFuture.compareAndSet(null, future)) {
+            return verifyNotNull(closeFuture.get());
         }
+
+        if (serviceEntityReg != null) {
+            // We are still holding the service registration, close it now...
+            LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
+            serviceEntityReg.close();
+            serviceEntityReg = null;
+        }
+
+        markDirty();
+        return future;
     }
 
     @Override
     void ownershipChanged(final C ownershipChange) {
         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
 
-        lock.lock();
-        try {
-            if (capture != null) {
-                capture.add(ownershipChange);
-            } else {
-                lockedOwnershipChanged(ownershipChange);
+        synchronized (this) {
+            lockedOwnershipChanged(ownershipChange);
+        }
+
+        if (isDirty()) {
+            if (!tryLock()) {
+                LOG.debug("Service group {} postponing ownership change sync", identifier);
+                return;
             }
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
+
+            reconcileState();
         }
     }
 
+    /**
+     * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
+     * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
+     *
+     * @param ownershipChange reported change
+     */
+    @Holding("this")
     private void lockedOwnershipChanged(final C ownershipChange) {
-        if (ownershipChange.inJeopardy()) {
-            LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
-            lostOwnership();
-            return;
-        }
-
         final E entity = ownershipChange.getEntity();
         if (serviceEntity.equals(entity)) {
-            serviceOwnershipChanged(ownershipChange);
+            serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+            markDirty();
         } else if (cleanupEntity.equals(entity)) {
-            cleanupCandidateOwnershipChanged(ownershipChange);
+            cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+            markDirty();
         } else {
             LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
         }
     }
 
-    private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
-        switch (ownershipChange.getState()) {
+    @Holding("this")
+    private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+        if (jeopardy) {
+            switch (state) {
+                case LOCAL_OWNERSHIP_GRANTED:
+                case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+                    LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
+                    cleanupEntityState = EntityState.OWNED_JEOPARDY;
+                    break;
+                case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+                case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+                case REMOTE_OWNERSHIP_CHANGED:
+                case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+                    LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
+                    cleanupEntityState = EntityState.UNOWNED;
+                    break;
+                default:
+                    throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+            }
+
+            return;
+        }
+
+        if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
+            // Pair info message with previous jeopardy
+            LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
+        }
+
+        switch (state) {
             case LOCAL_OWNERSHIP_GRANTED:
-                switch (state) {
-                    case TAKING_OWNERSHIP:
-                        // SLAVE to MASTER
-                        startServices();
-                        return;
-                    default:
-                        break;
-                }
+            case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+                cleanupEntityState = EntityState.OWNED;
                 break;
             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
-                switch (state) {
-                    case RELEASING_OWNERSHIP:
-                        // Slight cheat: if we are closing down, we just need to notify the future
-                        updateState(isClosed() ? State.INITIAL : State.STANDBY);
-                        return;
-                    case STARTING_SERVICES:
-                    case OWNER:
-                    case TAKING_OWNERSHIP:
-                        LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
-                        return;
-                    default:
-                        break;
-                }
-
-                break;
-            case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
-            case REMOTE_OWNERSHIP_CHANGED:
             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+            case REMOTE_OWNERSHIP_CHANGED:
+                cleanupEntityState = EntityState.UNOWNED;
+                break;
             default:
+                LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
                 break;
         }
-
-        LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
     }
 
-    private void serviceOwnershipChanged(final C ownershipChange) {
-        switch (ownershipChange.getState()) {
+    @Holding("this")
+    private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+        if (jeopardy) {
+            LOG.info("Service group {} service entity ownership uncertain", identifier);
+            switch (state) {
+                case LOCAL_OWNERSHIP_GRANTED:
+                case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+                    serviceEntityState = EntityState.OWNED_JEOPARDY;
+                    break;
+                case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+                case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+                case REMOTE_OWNERSHIP_CHANGED:
+                case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+                    serviceEntityState = EntityState.UNOWNED;
+                    break;
+                default:
+                    throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+            }
+            return;
+        }
+
+        if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
+            // Pair info message with previous jeopardy
+            LOG.info("Service group {} service entity ownership ascertained", identifier);
+        }
+
+        switch (state) {
             case LOCAL_OWNERSHIP_GRANTED:
-                // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
-                takeOwnership();
+            case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+                LOG.debug("Service group {} acquired service entity ownership", identifier);
+                serviceEntityState = EntityState.OWNED;
                 break;
             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
-                // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
-                lostOwnership();
+            case REMOTE_OWNERSHIP_CHANGED:
+            case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+                LOG.debug("Service group {} lost service entity ownership", identifier);
+                serviceEntityState = EntityState.UNOWNED;
                 break;
             default:
-                // Not needed notifications
-                LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
-                    ownershipChange);
+                LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
         }
     }
 
-    private void finishCloseIfNeeded() {
-        final SettableFuture<Void> future = closeFuture.get();
-        if (future != null) {
-            lock.lock();
+    // has to be called with lock asserted, which will be released prior to returning
+    private void reconcileState() {
+        // Always check if there is any state change to be applied.
+        while (true) {
             try {
-                lockedClose(future);
+                if (conditionalClean()) {
+                    tryReconcileState();
+                }
             } finally {
-                lock.unlock();
+                // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
+                // - registration
+                // - unregistration
+                // - service future completed
+                // - entity state changed
+                //
+                // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
+                // be dirty again. This closes the following race condition:
+                //
+                // A: runs these checks holding the lock
+                // B: modifies them, fails to acquire lock
+                // A: releases lock -> noone takes care of reconciliation
+
+                unlock();
             }
-        }
-    }
 
-    /*
-     * Help method to registered DoubleCandidateEntity. It is first step
-     * before the actual instance take Leadership.
-     */
-    private void takeOwnership() {
-        if (isClosed()) {
-            LOG.debug("Service group {} is closed, not taking ownership", identifier);
-            return;
-        }
+            if (isDirty()) {
+                if (tryLock()) {
+                    LOG.debug("Service group {} re-running reconciliation", identifier);
+                    continue;
+                }
 
-        LOG.debug("Group {} taking ownership", identifier);
+                LOG.debug("Service group {} will be reconciled by someone else", identifier);
+            } else {
+                LOG.debug("Service group {} is completely reconciled", identifier);
+            }
 
-        updateState(State.TAKING_OWNERSHIP);
-        try {
-            cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
-        } catch (CandidateAlreadyRegisteredException e) {
-            LOG.error("Service group {} failed to take ownership", identifier, e);
+            break;
         }
     }
 
-    /*
-     * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void startServices() {
-        if (isClosed()) {
-            LOG.debug("Service group {} is closed, not starting services", identifier);
-            return;
+    private void serviceTransitionCompleted() {
+        markDirty();
+        if (tryLock()) {
+            reconcileState();
         }
+    }
 
-        LOG.debug("Service group {} starting services", identifier);
-        serviceGroup.forEach(service -> {
-            LOG.debug("Starting service {}", service);
-            try {
-                service.instantiateServiceInstance();
-            } catch (Exception e) {
-                LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+    // Has to be called with lock asserted
+    private void tryReconcileState() {
+        // First take a safe snapshot of current state on which we will base our decisions.
+        final Set<ClusterSingletonServiceRegistration> localMembers;
+        final boolean haveCleanup;
+        final boolean haveService;
+        synchronized (this) {
+            if (serviceEntityReg != null) {
+                switch (serviceEntityState) {
+                    case OWNED:
+                    case OWNED_JEOPARDY:
+                        haveService = true;
+                        break;
+                    case REGISTERED:
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        haveService = false;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+                }
+            } else {
+                haveService = false;
             }
-        });
 
-        LOG.debug("Service group {} services started", identifier);
-        updateState(State.OWNER);
-    }
+            if (haveService && cleanupEntityReg == null) {
+                // We have the service entity but have not registered for cleanup entity. Do that now and retry.
+                LOG.debug("Service group {} registering cleanup entity", identifier);
+                try {
+                    cleanupEntityState = EntityState.REGISTERED;
+                    cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+                } catch (CandidateAlreadyRegisteredException e) {
+                    LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
+                    if (serviceEntityReg != null) {
+                        serviceEntityReg.close();
+                        serviceEntityReg = null;
+                    }
+                }
+                markDirty();
+                return;
+            }
 
-    /*
-     * Help method calls suspendService method for stop this single cluster-wide service instance.
-     * The last async. step has to close DoubleCandidateRegistration reference what should initialize
-     * new election for DoubleCandidateEntity.
-     */
-    private void lostOwnership() {
-        LOG.debug("Service group {} lost ownership in state {}", identifier, state);
-        switch (state) {
-            case REGISTERED:
-                updateState(State.STANDBY);
-                break;
-            case OWNER:
-                stopServices();
-                break;
-            case STARTING_SERVICES:
-            case STOPPING_SERVICES:
-                // No-op, as these will re-check state before proceeding
-                break;
-            case TAKING_OWNERSHIP:
-                cleanupEntityReg.close();
-                cleanupEntityReg = null;
-                updateState(State.STANDBY);
-                break;
-            case INITIAL:
-            case TERMINATED:
-            default:
-                LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
-                break;
+            if (cleanupEntityReg != null) {
+                switch (cleanupEntityState) {
+                    case OWNED:
+                        haveCleanup = true;
+                        break;
+                    case OWNED_JEOPARDY:
+                    case REGISTERED:
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        haveCleanup = false;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+                }
+            } else {
+                haveCleanup = false;
+            }
+
+            localMembers = ImmutableSet.copyOf(members);
         }
-    }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    void stopServices() {
-        updateState(State.STOPPING_SERVICES);
+        if (haveService && haveCleanup) {
+            ensureServicesStarting(localMembers);
+            return;
+        }
 
-        final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
-        for (final ClusterSingletonService service : serviceGroup) {
-            final ListenableFuture<Void> future;
+        ensureServicesStopping();
 
-            try {
-                future = service.closeServiceInstance();
-            } catch (Exception e) {
-                LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
-                    service, e);
-                continue;
+        if (!haveService && services.isEmpty()) {
+            LOG.debug("Service group {} has no running services", identifier);
+            final boolean canFinishClose;
+            synchronized (this) {
+                if (cleanupEntityReg != null) {
+                    LOG.debug("Service group {} releasing cleanup entity", identifier);
+                    cleanupEntityReg.close();
+                    cleanupEntityReg = null;
+                }
+
+                switch (cleanupEntityState) {
+                    case OWNED:
+                    case OWNED_JEOPARDY:
+                    case REGISTERED:
+                        // When we are registered we need to wait for registration to resolve, otherwise
+                        // the notification could be routed to the next incarnation of this group -- which could be
+                        // confused by the fact it is not registered, but receives, for example, OWNED notification.
+                        canFinishClose = false;
+                        break;
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        canFinishClose = true;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled cleanup entity state " + cleanupEntityState);
+                }
             }
 
-            serviceCloseFutureList.add(future);
+            if (canFinishClose) {
+                final SettableFuture<Void> localFuture = closeFuture.get();
+                if (localFuture != null && !localFuture.isDone()) {
+                    LOG.debug("Service group {} completing termination", identifier);
+                    localFuture.set(null);
+                }
+            }
         }
+    }
 
-        Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
-            @Override
-            public void onFailure(final Throwable cause) {
-                LOG.warn("Service group {} service stopping reported error", identifier, cause);
-                onServicesStopped();
+    // Has to be called with lock asserted
+    @SuppressWarnings("illegalCatch")
+    private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
+        LOG.debug("Service group {} starting services", identifier);
+
+        // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
+        // example when this method is executed as part of unregisterService() call. In that case we need to ensure
+        // services in the list are stopping
+        final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+        while (it.hasNext()) {
+            final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+            final ClusterSingletonServiceRegistration reg = entry.getKey();
+            if (!localConfig.contains(reg)) {
+                final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
+                if (newInfo != null) {
+                    entry.setValue(newInfo);
+                } else {
+                    it.remove();
+                }
             }
+        }
+
+        // Now make sure member services are being juggled around
+        for (ClusterSingletonServiceRegistration reg : localConfig) {
+            if (!services.containsKey(reg)) {
+                final ClusterSingletonService service = reg.getInstance();
+                LOG.debug("Starting service {}", service);
+
+                try {
+                    service.instantiateServiceInstance();
+                } catch (Exception e) {
+                    LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
+                        e);
+                    continue;
+                }
 
-            @Override
-            public void onSuccess(final List<Void> nulls) {
-                onServicesStopped();
+                services.put(reg, ServiceInfo.started());
             }
-        });
+        }
     }
 
-    void onServicesStopped() {
-        LOG.debug("Service group {} finished stopping services", identifier);
-        lock.lock();
-        try {
-            if (cleanupEntityReg != null) {
-                updateState(State.RELEASING_OWNERSHIP);
-                cleanupEntityReg.close();
-                cleanupEntityReg = null;
+    // Has to be called with lock asserted
+    private void ensureServicesStopping() {
+        final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+        while (it.hasNext()) {
+            final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+            final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
+            if (newInfo != null) {
+                entry.setValue(newInfo);
             } else {
-                updateState(State.STANDBY);
+                it.remove();
             }
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
         }
     }
 
+    @SuppressWarnings("illegalCatch")
+    private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
+        switch (info.getState()) {
+            case STARTED:
+                final ClusterSingletonService service = reg.getInstance();
+
+                LOG.debug("Service group {} stopping service {}", identifier, service);
+                final @NonNull ListenableFuture<?> future;
+                try {
+                    future = verifyNotNull(service.closeServiceInstance());
+                } catch (Exception e) {
+                    LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
+                        e);
+                    return null;
+                }
+
+                Futures.addCallback(future, new FutureCallback<Object>() {
+                    @Override
+                    public void onSuccess(final Object result) {
+                        LOG.debug("Service group {} service {} stopped successfully", identifier, service);
+                        serviceTransitionCompleted();
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable cause) {
+                        LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
+                        serviceTransitionCompleted();
+                    }
+                }, MoreExecutors.directExecutor());
+                return info.toState(ServiceState.STOPPING, future);
+            case STOPPING:
+                if (info.getFuture().isDone()) {
+                    LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
+                    return null;
+                }
+                return info;
+            default:
+                throw new IllegalStateException("Unhandled state " + info.getState());
+        }
+    }
+
+    private void markDirty() {
+        dirty = 1;
+    }
+
+    private boolean isDirty() {
+        return dirty != 0;
+    }
+
+    private boolean conditionalClean() {
+        return DIRTY_UPDATER.compareAndSet(this, 1, 0);
+    }
+
+    private boolean tryLock() {
+        return LOCK_UPDATER.compareAndSet(this, 0, 1);
+    }
+
+    private boolean unlock() {
+        verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
+        return true;
+    }
+
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
+        return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();
     }
 }