Rework ClusterSingletonServiceGroupImpl locking
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
index 3594d64f2248d7f633e9b0846054cb4253691e39..7570a4bbfeb3e08a19078c31af97eec5e5905db5 100644 (file)
@@ -5,25 +5,36 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.mdsal.singleton.dom.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.CheckReturnValue;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
@@ -89,18 +100,14 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
         UNOWNED,
     }
 
-    private enum ServiceState {
-        /**
-         * Local services are stopped.
-         */
-        STOPPED,
+    enum ServiceState {
         /**
-         * Local services are up and running.
+         * Local service is up and running.
          */
         // FIXME: we should support async startup, which will require a STARTING state.
         STARTED,
         /**
-         * Local services are being stopped.
+         * Local service is being stopped.
          */
         STOPPING,
     }
@@ -114,10 +121,22 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
     private final E serviceEntity;
     private final E cleanupEntity;
 
-    private final ReentrantLock lock = new ReentrantLock(true);
+    private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
+    // Guarded by lock
+    private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
 
-    @GuardedBy("lock")
-    private final List<ClusterSingletonServiceRegistration> serviceGroup;
+    // Marker for when any state changed
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
+    private volatile int dirty;
+
+    // Simplified lock: non-reentrant, support tryLock() only
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
+    @SuppressWarnings("unused")
+    private volatile int lock;
 
     /*
      * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
@@ -146,39 +165,27 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
      * acquire {@link #cleanupEntity}.
      */
-    @GuardedBy("lock")
+    @GuardedBy("this")
     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
     /**
      * Service (base) entity last reported state.
      */
-    @GuardedBy("lock")
+    @GuardedBy("this")
     private EntityState serviceEntityState = EntityState.UNREGISTERED;
 
     /**
      * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
      * and startup.
      */
-    @GuardedBy("lock")
+    @GuardedBy("this")
     private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
     /**
      * Cleanup (owner) entity last reported state.
      */
-    @GuardedBy("lock")
+    @GuardedBy("this")
     private EntityState cleanupEntityState = EntityState.UNREGISTERED;
 
-    /**
-     * Optional event capture list. This field is initialized when we interact with entity ownership service, to capture
-     * events reported during EOS method invocation -- like immediate acquisition of entity when we register it. This
-     * prevents bugs from recursion.
-     */
-    @GuardedBy("lock")
-    private List<C> capture = null;
-
-    /**
-     * State of local services.
-     */
-    @GuardedBy("lock")
-    private ServiceState localServicesState = ServiceState.STOPPED;
+    private volatile boolean initialized;
 
     /**
      * Class constructor. Note: last argument is reused as-is.
@@ -191,20 +198,21 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      * @param services Services list
      */
     ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
-            final E closeEntity, final List<ClusterSingletonServiceRegistration> services) {
-        Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+            final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
+        checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
         this.identifier = identifier;
-        this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
-        this.serviceEntity = Preconditions.checkNotNull(mainEntity);
-        this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
-        this.serviceGroup = Preconditions.checkNotNull(services);
+        this.entityOwnershipService = requireNonNull(entityOwnershipService);
+        this.serviceEntity = requireNonNull(mainEntity);
+        this.cleanupEntity = requireNonNull(closeEntity);
+        members.addAll(services);
+
         LOG.debug("Instantiated new service group for {}", identifier);
     }
 
     @VisibleForTesting
     ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
             final E closeEntity, final S entityOwnershipService) {
-        this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+        this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
     }
 
     @Override
@@ -214,230 +222,121 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
 
     @Override
     ListenableFuture<?> closeClusterSingletonGroup() {
-        // Assert our future first
-        final SettableFuture<Void> future = SettableFuture.create();
-        final SettableFuture<Void> existing = closeFuture.getAndSet(future);
-        if (existing != null) {
-            return existing;
-        }
+        final ListenableFuture<?> ret = destroyGroup();
+        members.clear();
+        markDirty();
 
-        if (!lock.tryLock()) {
-            // The lock is held, the cleanup will be finished by the owner thread
-            LOG.debug("Singleton group {} cleanup postponed", identifier);
-            return future;
-        }
-
-        try {
-            lockedClose(future);
-        } finally {
-            lock.unlock();
+        if (tryLock()) {
+            reconcileState();
+        } else {
+            LOG.debug("Service group {} postponing sync on close", identifier);
         }
 
-        LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
-        return future;
+        return ret;
     }
 
     private boolean isClosed() {
         return closeFuture.get() != null;
     }
 
-    @GuardedBy("lock")
-    private void startCapture() {
-        Verify.verify(capture == null, "Service group {} is already capturing events {}", identifier, capture);
-        capture = new ArrayList<>(0);
-        LOG.debug("Service group {} started capturing events", identifier);
-    }
-
-    private List<C> endCapture() {
-        final List<C> ret = Verify.verifyNotNull(capture, "Service group {} is not currently capturing", identifier);
-        capture = null;
-        LOG.debug("Service group {} finished capturing events, {} events to process", identifier, ret.size());
-        return ret;
-    }
-
-    @GuardedBy("lock")
-    private void lockedClose(final SettableFuture<Void> future) {
-        if (serviceEntityReg != null) {
-            // We are still holding the service registration, close it now...
-            LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
-            startCapture();
-            serviceEntityReg.close();
-            serviceEntityReg = null;
-
-            // This can potentially mutate our state, so all previous checks need to be re-validated.
-            endCapture().forEach(this::lockedOwnershipChanged);
-        }
-
-        // Now check service entity state: if it is still owned, we need to wait until it is acknowledged as
-        // unregistered.
-        switch (serviceEntityState) {
-            case REGISTERED:
-            case UNOWNED:
-            case UNREGISTERED:
-                // We have either successfully shut down, or have never started up, proceed with termination
-                break;
-            case OWNED:
-                // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
-                // when that loss is reported.
-                LOG.debug("Service group {} is still owned, postponing termination", identifier);
-                return;
-            case OWNED_JEOPARDY:
-                // This is a significant event, as it relates to cluster split/join operations, operators need to know
-                // we are waiting for cluster join event.
-                LOG.info("Service group {} is still owned with split cluster, postponing termination", identifier);
-                return;
-            default:
-                throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
-        }
-
-        // We do not own service entity state: we need to ensure services are stopped.
-        if (stopServices()) {
-            LOG.debug("Service group {} started shutting down services, postponing termination", identifier);
-            return;
-        }
-
-        // Local cleanup completed, release cleanup entity if needed
-        if (cleanupEntityReg != null) {
-            LOG.debug("Service group {} unregistering cleanup entity {}", identifier, cleanupEntity);
-            startCapture();
-            cleanupEntityReg.close();
-            cleanupEntityReg = null;
-
-            // This can potentially mutate our state, so all previous checks need to be re-validated.
-            endCapture().forEach(this::lockedOwnershipChanged);
-        }
-
-        switch (cleanupEntityState) {
-            case REGISTERED:
-            case UNOWNED:
-            case UNREGISTERED:
-                // We have either successfully shut down, or have never started up, proceed with termination
-                break;
-            case OWNED:
-                // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
-                // when that loss is reported.
-                LOG.debug("Service group {} is still owns cleanup, postponing termination", identifier);
-                return;
-            case OWNED_JEOPARDY:
-                // This is a significant event, as it relates to cluster split/join operations, operators need to know
-                // we are waiting for cluster join event.
-                LOG.info("Service group {} is still owns cleanup with split cluster, postponing termination",
-                    identifier);
-                return;
-            default:
-                throw new IllegalStateException("Unhandled cleanup entity state " + serviceEntityState);
-        }
-
-        // No registrations left and no service operations pending, we are done
-        LOG.debug("Service group {} completing termination", identifier);
-        future.set(null);
-    }
-
     @Override
     void initialize() throws CandidateAlreadyRegisteredException {
-        lock.lock();
+        verify(tryLock());
         try {
-            Preconditions.checkState(serviceEntityState == EntityState.UNREGISTERED,
-                    "Singleton group %s was already initilized", identifier);
-
-            LOG.debug("Initializing service group {} with services {}", identifier, serviceGroup);
-            startCapture();
-            serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
-            serviceEntityState = EntityState.REGISTERED;
-            endCapture().forEach(this::lockedOwnershipChanged);
+            checkState(!initialized, "Singleton group %s was already initilized", identifier);
+            LOG.debug("Initializing service group {} with services {}", identifier, members);
+            synchronized (this) {
+                serviceEntityState = EntityState.REGISTERED;
+                serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+                initialized = true;
+            }
         } finally {
-            lock.unlock();
+            unlock();
         }
     }
 
     private void checkNotClosed() {
-        Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
-                identifier);
+        checkState(!isClosed(), "Service group %s has already been closed", identifier);
     }
 
     @Override
     void registerService(final ClusterSingletonServiceRegistration reg) {
         final ClusterSingletonService service = reg.getInstance();
-        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+        verify(identifier.equals(service.getIdentifier().getValue()));
         checkNotClosed();
 
-        lock.lock();
-        try {
-            Preconditions.checkState(serviceEntityState != EntityState.UNREGISTERED,
-                    "Service group %s is not initialized yet", identifier);
+        checkState(initialized, "Service group %s is not initialized yet", identifier);
 
-            LOG.debug("Adding service {} to service group {}", service, identifier);
-            serviceGroup.add(reg);
+        // First put the service
+        LOG.debug("Adding service {} to service group {}", service, identifier);
+        verify(members.add(reg));
+        markDirty();
 
-            switch (localServicesState) {
-                case STARTED:
-                    LOG.debug("Service group {} starting late-registered service {}", identifier, service);
-                    service.instantiateServiceInstance();
-                    break;
-                case STOPPED:
-                case STOPPING:
-                    break;
-                default:
-                    throw new IllegalStateException("Unhandled local services state " + localServicesState);
-            }
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
+        if (!tryLock()) {
+            LOG.debug("Service group {} delayed register of {}", identifier, reg);
+            return;
         }
+
+        reconcileState();
     }
 
     @CheckReturnValue
     @Override
-    boolean unregisterService(final ClusterSingletonServiceRegistration reg) {
+    ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
         final ClusterSingletonService service = reg.getInstance();
-        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+        verify(identifier.equals(service.getIdentifier().getValue()));
         checkNotClosed();
 
-        lock.lock();
-        try {
-            // There is a slight problem here, as the type does not match the list type, hence we need to tread
-            // carefully.
-            if (serviceGroup.size() == 1) {
-                Verify.verify(serviceGroup.contains(reg));
-                return true;
-            }
+        verify(members.remove(reg));
+        markDirty();
+        if (members.isEmpty()) {
+            // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
+            // before we start applying state, because while we do not re-enter, the user is free to do whatever,
+            // notably including registering a service with the same ID from the service shutdown hook. That
+            // registration request needs to hit the successor of this group.
+            return destroyGroup();
+        }
 
-            Verify.verify(serviceGroup.remove(reg));
-            LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+        if (tryLock()) {
+            reconcileState();
+        } else {
+            LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
+        }
+        return null;
+    }
 
-            switch (localServicesState) {
-                case STARTED:
-                    LOG.warn("Service group {} stopping unregistered service {}", identifier, service);
-                    service.closeServiceInstance();
-                    break;
-                case STOPPED:
-                case STOPPING:
-                    break;
-                default:
-                    throw new IllegalStateException("Unhandled local services state " + localServicesState);
-            }
+    private synchronized @NonNull ListenableFuture<?> destroyGroup() {
+        final SettableFuture<Void> future = SettableFuture.create();
+        if (!closeFuture.compareAndSet(null, future)) {
+            return verifyNotNull(closeFuture.get());
+        }
 
-            return false;
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
+        if (serviceEntityReg != null) {
+            // We are still holding the service registration, close it now...
+            LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
+            serviceEntityReg.close();
+            serviceEntityReg = null;
         }
+
+        markDirty();
+        return future;
     }
 
     @Override
     void ownershipChanged(final C ownershipChange) {
         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
 
-        lock.lock();
-        try {
-            if (capture != null) {
-                capture.add(ownershipChange);
-            } else {
-                lockedOwnershipChanged(ownershipChange);
+        synchronized (this) {
+            lockedOwnershipChanged(ownershipChange);
+        }
+
+        if (isDirty()) {
+            if (!tryLock()) {
+                LOG.debug("Service group {} postponing ownership change sync");
+                return;
             }
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
+
+            reconcileState();
         }
     }
 
@@ -447,28 +346,26 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      *
      * @param ownershipChange reported change
      */
-    @GuardedBy("lock")
+    @GuardedBy("this")
     private void lockedOwnershipChanged(final C ownershipChange) {
         final E entity = ownershipChange.getEntity();
         if (serviceEntity.equals(entity)) {
             serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+            markDirty();
         } else if (cleanupEntity.equals(entity)) {
             cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+            markDirty();
         } else {
             LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
         }
     }
 
+    @GuardedBy("this")
     private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
         if (jeopardy) {
             switch (state) {
                 case LOCAL_OWNERSHIP_GRANTED:
                 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
-                    if (cleanupEntityReg == null) {
-                        LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
-                        return;
-                    }
-
                     LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
                     cleanupEntityState = EntityState.OWNED_JEOPARDY;
                     break;
@@ -483,7 +380,6 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
                     throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
             }
 
-            stopServices();
             return;
         }
 
@@ -495,31 +391,10 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
         switch (state) {
             case LOCAL_OWNERSHIP_GRANTED:
             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
-                if (cleanupEntityReg == null) {
-                    LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
-                    return;
-                }
-
                 cleanupEntityState = EntityState.OWNED;
-                switch (localServicesState) {
-                    case STARTED:
-                        LOG.debug("Service group {} already has local services running", identifier);
-                        break;
-                    case STOPPED:
-                        startServices();
-                        break;
-                    case STOPPING:
-                        LOG.debug("Service group {} has local services stopping, postponing startup", identifier);
-                        break;
-                    default:
-                        throw new IllegalStateException("Unhandled local services state " + localServicesState);
-                }
                 break;
             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
-                cleanupEntityState = EntityState.UNOWNED;
-                stopServices();
-                break;
             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
             case REMOTE_OWNERSHIP_CHANGED:
                 cleanupEntityState = EntityState.UNOWNED;
@@ -530,20 +405,13 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
         }
     }
 
+    @GuardedBy("this")
     private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
         if (jeopardy) {
             LOG.info("Service group {} service entity ownership uncertain", identifier);
-
-            // Service entity ownership is uncertain, which means we want to record the state, but we do not want
-            // to stop local services nor do anything with the cleanup entity.
             switch (state) {
                 case LOCAL_OWNERSHIP_GRANTED:
                 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
-                    if (serviceEntityReg == null) {
-                        LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
-                        return;
-                    }
-
                     serviceEntityState = EntityState.OWNED_JEOPARDY;
                     break;
                 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
@@ -566,202 +434,291 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
         switch (state) {
             case LOCAL_OWNERSHIP_GRANTED:
             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
-                if (serviceEntityReg == null) {
-                    LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
-                    return;
-                }
-
+                LOG.debug("Service group {} acquired service entity ownership", identifier);
                 serviceEntityState = EntityState.OWNED;
-                takeOwnership();
                 break;
             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
-                LOG.debug("Service group {} lost service entity ownership", identifier);
-                serviceEntityState = EntityState.UNOWNED;
-                if (stopServices()) {
-                    LOG.debug("Service group {} already stopping services, postponing cleanup", identifier);
-                    return;
-                }
-
-                if (cleanupEntityReg != null) {
-                    cleanupEntityReg.close();
-                    cleanupEntityReg = null;
-                }
-                break;
             case REMOTE_OWNERSHIP_CHANGED:
             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
-                // No need to react, just update the state
+                LOG.debug("Service group {} lost service entity ownership", identifier);
                 serviceEntityState = EntityState.UNOWNED;
                 break;
             default:
                 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
-                break;
         }
     }
 
-    private void finishCloseIfNeeded() {
-        final SettableFuture<Void> future = closeFuture.get();
-        if (future != null) {
-            lock.lock();
+    // has to be called with lock asserted, which will be released prior to returning
+    private void reconcileState() {
+        // Always check if there is any state change to be applied.
+        while (true) {
             try {
-                lockedClose(future);
+                if (conditionalClean()) {
+                    tryReconcileState();
+                }
             } finally {
-                lock.unlock();
+                // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
+                // - registration
+                // - unregistration
+                // - service future completed
+                // - entity state changed
+                //
+                // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
+                // be dirty again. This closes the following race condition:
+                //
+                // A: runs these checks holding the lock
+                // B: modifies them, fails to acquire lock
+                // A: releases lock -> noone takes care of reconciliation
+
+                unlock();
             }
+
+            if (isDirty()) {
+                if (tryLock()) {
+                    LOG.debug("Service group {} re-running reconciliation", identifier);
+                    continue;
+                }
+
+                LOG.debug("Service group {} will be reconciled by someone else", identifier);
+            } else {
+                LOG.debug("Service group {} is completely reconciled", identifier);
+            }
+
+            break;
         }
     }
 
-    /*
-     * Help method to registered DoubleCandidateEntity. It is first step
-     * before the actual instance take Leadership.
-     */
-    private void takeOwnership() {
-        if (isClosed()) {
-            LOG.debug("Service group {} is closed, skipping cleanup ownership bid", identifier);
-            return;
+    private void serviceTransitionCompleted() {
+        markDirty();
+        if (tryLock()) {
+            reconcileState();
         }
+    }
+
+    // Has to be called with lock asserted
+    private void tryReconcileState() {
+        // First take a safe snapshot of current state on which we will base our decisions.
+        final Set<ClusterSingletonServiceRegistration> localMembers;
+        final boolean haveCleanup;
+        final boolean haveService;
+        synchronized (this) {
+            if (serviceEntityReg != null) {
+                switch (serviceEntityState) {
+                    case OWNED:
+                    case OWNED_JEOPARDY:
+                        haveService = true;
+                        break;
+                    case REGISTERED:
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        haveService = false;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+                }
+            } else {
+                haveService = false;
+            }
 
-        LOG.debug("Service group {} registering cleanup entity", identifier);
+            if (haveService && cleanupEntityReg == null) {
+                // We have the service entity but have not registered for cleanup entity. Do that now and retry.
+                LOG.debug("Service group {} registering cleanup entity", identifier);
+                try {
+                    cleanupEntityState = EntityState.REGISTERED;
+                    cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+                } catch (CandidateAlreadyRegisteredException e) {
+                    LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
+                    if (serviceEntityReg != null) {
+                        serviceEntityReg.close();
+                        serviceEntityReg = null;
+                    }
+                }
+                markDirty();
+                return;
+            }
 
-        startCapture();
-        try {
-            cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
-            cleanupEntityState = EntityState.REGISTERED;
-        } catch (CandidateAlreadyRegisteredException e) {
-            LOG.error("Service group {} failed to take ownership", identifier, e);
-        }
+            if (cleanupEntityReg != null) {
+                switch (cleanupEntityState) {
+                    case OWNED:
+                        haveCleanup = true;
+                        break;
+                    case OWNED_JEOPARDY:
+                    case REGISTERED:
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        haveCleanup = false;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+                }
+            } else {
+                haveCleanup = false;
+            }
 
-        endCapture().forEach(this::lockedOwnershipChanged);
-    }
+            localMembers = ImmutableSet.copyOf(members);
+        }
 
-    /*
-     * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void startServices() {
-        if (isClosed()) {
-            LOG.debug("Service group {} is closed, not starting services", identifier);
+        if (haveService && haveCleanup) {
+            ensureServicesStarting(localMembers);
             return;
         }
 
-        LOG.debug("Service group {} starting services", identifier);
-        serviceGroup.forEach(reg -> {
-            final ClusterSingletonService service = reg.getInstance();
-            LOG.debug("Starting service {}", service);
-            try {
-                service.instantiateServiceInstance();
-            } catch (Exception e) {
-                LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+        ensureServicesStopping();
+
+        if (!haveService && services.isEmpty()) {
+            LOG.debug("Service group {} has no running services", identifier);
+            final boolean canFinishClose;
+            synchronized (this) {
+                if (cleanupEntityReg != null) {
+                    LOG.debug("Service group {} releasing cleanup entity", identifier);
+                    cleanupEntityReg.close();
+                    cleanupEntityReg = null;
+                }
+
+                switch (cleanupEntityState) {
+                    case OWNED:
+                    case OWNED_JEOPARDY:
+                    case REGISTERED:
+                        // When we are registered we need to wait for registration to resolve, otherwise
+                        // the notification could be routed to the next incarnation of this group -- which could be
+                        // confused by the fact it is not registered, but receives, for example, OWNED notification.
+                        canFinishClose = false;
+                        break;
+                    case UNOWNED:
+                    case UNREGISTERED:
+                        canFinishClose = true;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled cleanup entity state " + cleanupEntityState);
+                }
             }
-        });
 
-        localServicesState = ServiceState.STARTED;
-        LOG.debug("Service group {} services started", identifier);
+            if (canFinishClose) {
+                final SettableFuture<Void> localFuture = closeFuture.get();
+                if (localFuture != null && !localFuture.isDone()) {
+                    LOG.debug("Service group {} completing termination", identifier);
+                    localFuture.set(null);
+                }
+            }
+        }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    boolean stopServices() {
-        switch (localServicesState) {
-            case STARTED:
-                localServicesState = ServiceState.STOPPING;
-
-                final List<ListenableFuture<?>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
-                for (final ClusterSingletonServiceRegistration reg : serviceGroup) {
-                    final ClusterSingletonService service = reg.getInstance();
-                    final ListenableFuture<?> future;
-                    try {
-                        future = service.closeServiceInstance();
-                    } catch (Exception e) {
-                        LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
-                            service, e);
-                        continue;
-                    }
+    // Has to be called with lock asserted
+    @SuppressWarnings("illegalCatch")
+    private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
+        LOG.debug("Service group {} starting services", identifier);
+
+        // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
+        // example when this method is executed as part of unregisterService() call. In that case we need to ensure
+        // services in the list are stopping
+        final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+        while (it.hasNext()) {
+            final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+            final ClusterSingletonServiceRegistration reg = entry.getKey();
+            if (!localConfig.contains(reg)) {
+                final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
+                if (newInfo != null) {
+                    entry.setValue(newInfo);
+                } else {
+                    it.remove();
+                }
+            }
+        }
+
+        // Now make sure member services are being juggled around
+        for (ClusterSingletonServiceRegistration reg : localConfig) {
+            if (!services.containsKey(reg)) {
+                final ClusterSingletonService service = reg.getInstance();
+                LOG.debug("Starting service {}", service);
 
-                    serviceCloseFutureList.add(future);
+                try {
+                    service.instantiateServiceInstance();
+                } catch (Exception e) {
+                    LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
+                        e);
+                    continue;
                 }
 
-                LOG.debug("Service group {} initiated service shutdown", identifier);
+                services.put(reg, ServiceInfo.started());
+            }
+        }
+    }
+
+    // Has to be called with lock asserted
+    private void ensureServicesStopping() {
+        final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+        while (it.hasNext()) {
+            final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+            final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
+            if (newInfo != null) {
+                entry.setValue(newInfo);
+            } else {
+                it.remove();
+            }
+        }
+    }
+
+    @SuppressWarnings("illegalCatch")
+    private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
+        switch (info.getState()) {
+            case STARTED:
+                final ClusterSingletonService service = reg.getInstance();
+
+                LOG.debug("Service group {} stopping service {}", identifier, service);
+                final @NonNull ListenableFuture<?> future;
+                try {
+                    future = verifyNotNull(service.closeServiceInstance());
+                } catch (Exception e) {
+                    LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
+                        e);
+                    return null;
+                }
 
-                Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<?>>() {
+                Futures.addCallback(future, new FutureCallback<Object>() {
                     @Override
-                    public void onFailure(final Throwable cause) {
-                        LOG.warn("Service group {} service stopping reported error", identifier, cause);
-                        onServicesStopped();
+                    public void onSuccess(final Object result) {
+                        LOG.debug("Service group {} service {} stopped successfully", identifier, service);
+                        serviceTransitionCompleted();
                     }
 
                     @Override
-                    public void onSuccess(final List<?> nulls) {
-                        onServicesStopped();
+                    public void onFailure(final Throwable cause) {
+                        LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
+                        serviceTransitionCompleted();
                     }
                 }, MoreExecutors.directExecutor());
-
-                return localServicesState == ServiceState.STOPPING;
-            case STOPPED:
-                LOG.debug("Service group {} has already stopped services", identifier);
-                return false;
+                return info.toState(ServiceState.STOPPING, future);
             case STOPPING:
-                LOG.debug("Service group {} is already stopping services", identifier);
-                return true;
+                if (info.getFuture().isDone()) {
+                    LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
+                    return null;
+                }
+                return info;
             default:
-                throw new IllegalStateException("Unhandled local services state " + localServicesState);
+                throw new IllegalStateException("Unhandled state " + info.getState());
         }
     }
 
-    void onServicesStopped() {
-        LOG.debug("Service group {} finished stopping services", identifier);
-        lock.lock();
-        try {
-            localServicesState = ServiceState.STOPPED;
+    private void markDirty() {
+        dirty = 1;
+    }
 
-            if (isClosed()) {
-                LOG.debug("Service group {} closed, skipping service restart check", identifier);
-                return;
-            }
+    private boolean isDirty() {
+        return dirty != 0;
+    }
 
-            // If we lost the service entity while services were stopping, we need to unregister cleanup entity
-            switch (serviceEntityState) {
-                case OWNED:
-                case OWNED_JEOPARDY:
-                    // No need to churn cleanup entity
-                    break;
-                case REGISTERED:
-                case UNOWNED:
-                case UNREGISTERED:
-                    if (cleanupEntityReg != null) {
-                        startCapture();
-                        cleanupEntityReg.close();
-                        cleanupEntityReg = null;
-                        endCapture().forEach(this::lockedOwnershipChanged);
-                    }
-                    break;
-                default:
-                    throw new IllegalStateException("Unhandled service entity state" + serviceEntityState);
-            }
+    private boolean conditionalClean() {
+        return DIRTY_UPDATER.compareAndSet(this, 1, 0);
+    }
 
-            if (cleanupEntityReg == null) {
-                LOG.debug("Service group {} does not have cleanup entity registered, skipping restart check",
-                    identifier);
-                return;
-            }
+    private boolean tryLock() {
+        return LOCK_UPDATER.compareAndSet(this, 0, 1);
+    }
 
-            // Double-check if the services should really be down
-            switch (cleanupEntityState) {
-                case OWNED:
-                    // We have finished stopping services, but we own cleanup, e.g. we should start them again.
-                    startServices();
-                    return;
-                case UNOWNED:
-                case OWNED_JEOPARDY:
-                case REGISTERED:
-                case UNREGISTERED:
-                    break;
-                default:
-                    throw new IllegalStateException("Unhandled cleanup entity state" + cleanupEntityState);
-            }
-        } finally {
-            lock.unlock();
-            finishCloseIfNeeded();
-        }
+    private boolean unlock() {
+        verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
+        return true;
     }
 
     @Override