BUG-8858: rework singleton group locking
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
index 0d4920bed3da77739960b054104b817382a0fbaf..82897e6c60366571ee430dd1e675b93194ca9fa8 100644 (file)
@@ -9,36 +9,45 @@
 package org.opendaylight.mdsal.singleton.dom.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.CheckReturnValue;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
-import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yangtools.concepts.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of {@link ClusterSingletonServiceGroup}.
+ * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
+ * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
+ *
+ * <p>
+ * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
+ * a result on new candidates appearing. We use two entities:
+ * - service entity, to which all nodes register
+ * - cleanup entity, which only the service entity owner registers to
+ *
+ * <p>
+ * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
+ * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
+ * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
+ * granted until the old owner finishes the cleanup.
  *
  * @param <P> the instance identifier path type
  * @param <E> the GenericEntity type
@@ -46,218 +55,376 @@ import org.slf4j.LoggerFactory;
  * @param <G> the GenericEntityOwnershipListener type
  * @param <S> the GenericEntityOwnershipService type
  */
-@VisibleForTesting
 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
-                                             C extends GenericEntityOwnershipChange<P, E>,
-                                             G extends GenericEntityOwnershipListener<P, C>,
-                                             S extends GenericEntityOwnershipService<P, E, G>>
-        implements ClusterSingletonServiceGroup<P, E, C> {
+        C extends GenericEntityOwnershipChange<P, E>,  G extends GenericEntityOwnershipListener<P, C>,
+        S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
+    private enum State {
+        /**
+         * This group has been freshly allocated and has not been started yet.
+         */
+        INITIAL,
+        /**
+         * Operational state. Service entity is registered, but ownership was not resolved yet.
+         */
+        REGISTERED,
+        /**
+         * Operational state. Service entity confirmed to be follower.
+         */
+        STANDBY,
+        /**
+         * Service entity acquired. Attempting to acquire cleanup entity.
+         */
+        TAKING_OWNERSHIP,
+        /**
+         * Both entities held and user services are being started.
+         */
+        STARTING_SERVICES,
+        /**
+         * Steady state. Both entities held and services have finished starting.
+         */
+        OWNER,
+        /**
+         * User services are being stopped due to either loss of an entity or a shutdown.
+         */
+        STOPPING_SERVICES,
+        /**
+         * We have stopped services and are now relinquishing the cleanup entity.
+         */
+        RELEASING_OWNERSHIP,
+        /**
+         * Terminated, this group cannot be used anymore.
+         */
+        TERMINATED
+    }
 
-    private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
 
     private final S entityOwnershipService;
-    private final String clusterSingletonGroupIdentifier;
-    private final Semaphore clusterLock = new Semaphore(1, true);
+    private final String identifier;
 
     /* Entity instances */
     private final E serviceEntity;
-    private final E doubleCandidateEntity;
-
-    // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
-    // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
-    // FOLLOWER : baseCandidate is registered correctly
-    // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
-    // LEADER : both candidate have mastership from EOS
-    // TERMINATED : service go down
-    @GuardedBy("clusterLock")
-    private boolean hasOwnership = false;
-    @GuardedBy("clusterLock")
-    private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
-    private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
+    private final E cleanupEntity;
+
+    private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
+    private final ReentrantLock lock = new ReentrantLock(true);
+
+    @GuardedBy("lock")
+    private final List<ClusterSingletonService> serviceGroup;
+
+    @GuardedBy("lock")
+    private State state = State.INITIAL;
+
+    @GuardedBy("lock")
+    private List<C> capture;
 
     /* EOS Candidate Registrations */
-    private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
-    private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
+    @GuardedBy("lock")
+    private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
+    @GuardedBy("lock")
+    private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
 
     /**
-     * Class constructor.
+     * Class constructor. Note: last argument is reused as-is.
      *
-     * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
+     * @param identifier non-empty string as identifier
      * @param mainEntity as Entity instance
      * @param closeEntity as Entity instance
      * @param entityOwnershipService GenericEntityOwnershipService instance
-     * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
+     * @param parent parent service
+     * @param services Services list
      */
-    ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
-            final E closeEntity, final S entityOwnershipService,
-            final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
-        LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
-        this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
+    ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
+            final E closeEntity, final List<ClusterSingletonService> services) {
+        Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+        this.identifier = identifier;
         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
-        this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
-        this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
+        this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
+        this.serviceGroup = Preconditions.checkNotNull(services);
+        LOG.debug("Instantiated new service group for {}", identifier);
+    }
+
+    @VisibleForTesting
+    ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
+            final E closeEntity, final S entityOwnershipService) {
+        this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
-    public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
-        LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
-        boolean needReleaseLock = false;
-        final ListenableFuture<List<Void>> destroyFuture;
+    ListenableFuture<?> closeClusterSingletonGroup() {
+        // Assert our future first
+        final SettableFuture<Void> future = SettableFuture.create();
+        final SettableFuture<Void> existing = closeFuture.getAndSet(future);
+        if (existing != null) {
+            return existing;
+        }
+
+        if (!lock.tryLock()) {
+            // The lock is held, the cleanup will be finished by the owner thread
+            LOG.debug("Singleton group {} cleanup postponed", identifier);
+            return future;
+        }
+
         try {
-            needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
-        } catch (final Exception e) {
-            LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
-                    e);
+            lockedClose(future);
         } finally {
-            if (serviceEntityCandidateReg != null) {
-                serviceEntityCandidateReg.close();
-                serviceEntityCandidateReg = null;
-            }
-            final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
-            if (hasOwnership) {
-                for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
-                    try {
-                        serviceCloseFutureList.add(service.closeServiceInstance());
-                    } catch (final RuntimeException e) {
-                        LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
-                                service.getIdentifier());
-                    }
-                }
-                hasOwnership = false;
-            }
-            destroyFuture = Futures.allAsList(serviceCloseFutureList);
-            final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
-            Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
+            lock.unlock();
         }
-        return destroyFuture;
+
+        LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
+        return future;
+    }
+
+    private boolean isClosed() {
+        return closeFuture.get() != null;
+    }
+
+    @GuardedBy("lock")
+    private void updateState(final State newState) {
+        LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
+        state = Verify.verifyNotNull(newState);
+    }
+
+    @GuardedBy("lock")
+    private void lockedClose(final SettableFuture<Void> future) {
+        if (serviceEntityReg != null) {
+            LOG.debug("Service group {} unregistering", identifier);
+            serviceEntityReg.close();
+            serviceEntityReg = null;
+        }
+
+        switch (state) {
+            case INITIAL:
+                // Not started: not much to do
+                terminate(future);
+                break;
+            case TERMINATED:
+                // Already done: no-op
+                break;
+            case REGISTERED:
+            case STANDBY:
+                LOG.debug("Service group {} terminated", identifier);
+                terminate(future);
+                break;
+            case OWNER:
+                // No-op, we will react to the loss of registration instead.
+                break;
+            case STOPPING_SERVICES:
+                // Waiting for services. Will resume once we get notified.
+                break;
+            case RELEASING_OWNERSHIP:
+                // Waiting for cleanup entity to flip, will resume afterwards.
+                break;
+            case TAKING_OWNERSHIP:
+                // Abort taking of ownership and close
+                LOG.debug("Service group {} aborting ownership bid", identifier);
+                cleanupEntityReg.close();
+                cleanupEntityReg = null;
+                updateState(State.RELEASING_OWNERSHIP);
+                break;
+            default:
+                throw new IllegalStateException("Unhandled state " + state);
+        }
+    }
+
+    @GuardedBy("lock")
+    private void terminate(final SettableFuture<Void> future) {
+        updateState(State.TERMINATED);
+        Verify.verify(future.set(null));
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
-    public void initializationClusterSingletonGroup() {
-        LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
+    void initialize() throws CandidateAlreadyRegisteredException {
+        LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
+
+        lock.lock();
         try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            Verify.verify(serviceGroup.isEmpty());
-            Verify.verify(!hasOwnership);
-            Verify.verify(serviceEntityCandidateReg == null);
-            serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
-        } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
-            LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-            throw new RuntimeException(e);
+            Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
+                    state);
+
+            // Catch events if they fire during this call
+            capture = new ArrayList<>(0);
+            serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+            state = State.REGISTERED;
+
+            final List<C> captured = capture;
+            capture = null;
+            captured.forEach(this::lockedOwnershipChanged);
         } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+            lock.unlock();
         }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void checkNotClosed() {
+        Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
+                identifier);
+    }
+
     @Override
-    public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
-        LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
-        Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
-        ClusterSingletonServiceRegistrationDelegator reg = null;
+    void registerService(final ClusterSingletonService service) {
+        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+        checkNotClosed();
+
+        LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
+
+        lock.lock();
         try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            Verify.verify(serviceEntityCandidateReg != null);
-            reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
-            serviceGroup.add(reg);
-            if (hasOwnership) {
-                service.instantiateServiceInstance();
+            Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
+            serviceGroup.add(service);
+
+            switch (state) {
+                case OWNER:
+                case STARTING_SERVICES:
+                    service.instantiateServiceInstance();
+                    break;
+                default:
+                    break;
             }
-        } catch (final RuntimeException | InterruptedException e) {
-            LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-            throw new RuntimeException(e);
         } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+            lock.unlock();
         }
-        return reg;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
+    @CheckReturnValue
     @Override
-    public void unregisterService(final ClusterSingletonService service) {
-        LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
-        Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
+    boolean unregisterService(final ClusterSingletonService service) {
+        Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+        checkNotClosed();
+
+        lock.lock();
         try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            if (serviceGroup.size() > 1) {
-                if (hasOwnership) {
+            // There is a slight problem here, as the type does not match the list type, hence we need to tread
+            // carefully.
+            if (serviceGroup.size() == 1) {
+                Verify.verify(serviceGroup.contains(service));
+                return true;
+            }
+
+            Verify.verify(serviceGroup.remove(service));
+            LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+
+            switch (state) {
+                case OWNER:
+                case STARTING_SERVICES:
                     service.closeServiceInstance();
-                }
-                serviceGroup.remove(service);
-                LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
-            } else {
-                needCloseProviderInstance = true;
+                    break;
+                default:
+                    break;
             }
-        } catch (final RuntimeException | InterruptedException e) {
-            LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-            throw new RuntimeException(e);
+
+            return false;
         } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+            lock.unlock();
+            finishCloseIfNeeded();
         }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
-    public void ownershipChanged(final C ownershipChange) {
-        LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
-                clusterSingletonGroupIdentifier);
+    void ownershipChanged(final C ownershipChange) {
+        LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
+
+        lock.lock();
         try {
-            if (ownershipChange.inJeopardy()) {
-                LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
-                lostOwnership();
-                return;
+            if (capture != null) {
+                capture.add(ownershipChange);
+            } else {
+                lockedOwnershipChanged(ownershipChange);
             }
-            if (serviceEntity.equals(ownershipChange.getEntity())) {
-                if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
-                    /*
-                     * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
-                     */
-                    tryToTakeOwnership();
-                } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
-                        || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
-                                .equals(ownershipChange.getState())) {
-                    /*
-                     * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
-                     */
-                    lostOwnership();
-                } else {
-                    /* Not needed notifications */
-                    LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
-                            clusterSingletonGroupIdentifier);
+        } finally {
+            lock.unlock();
+            finishCloseIfNeeded();
+        }
+    }
+
+    private void lockedOwnershipChanged(final C ownershipChange) {
+        if (ownershipChange.inJeopardy()) {
+            LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
+            lostOwnership();
+            return;
+        }
+
+        final E entity = ownershipChange.getEntity();
+        if (serviceEntity.equals(entity)) {
+            serviceOwnershipChanged(ownershipChange);
+        } else if (cleanupEntity.equals(entity)) {
+            cleanupCandidateOwnershipChanged(ownershipChange);
+        } else {
+            LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
+        }
+    }
+
+    private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
+        switch (ownershipChange.getState()) {
+            case LOCAL_OWNERSHIP_GRANTED:
+                switch (state) {
+                    case TAKING_OWNERSHIP:
+                        // SLAVE to MASTER
+                        startServices();
+                        return;
+                    default:
+                        break;
                 }
-            } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
-                if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
-                    /*
-                     * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
-                     */
-                    takeOwnership();
-                } else {
-                    /* Not needed notifications */
-                    LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
-                            ownershipChange, clusterSingletonGroupIdentifier);
+                break;
+            case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+            case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+                switch (state) {
+                    case RELEASING_OWNERSHIP:
+                        // Slight cheat: if we are closing down, we just need to notify the future
+                        updateState(isClosed() ? State.INITIAL : State.STANDBY);
+                        return;
+                    case STARTING_SERVICES:
+                    case OWNER:
+                    case TAKING_OWNERSHIP:
+                        LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
+                        return;
+                    default:
+                        break;
                 }
-            } else {
-                LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
+
+                break;
+            case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+            case REMOTE_OWNERSHIP_CHANGED:
+            case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+            default:
+                break;
+        }
+
+        LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
+    }
+
+    private void serviceOwnershipChanged(final C ownershipChange) {
+        switch (ownershipChange.getState()) {
+            case LOCAL_OWNERSHIP_GRANTED:
+                // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
+                takeOwnership();
+                break;
+            case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+            case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+                // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
+                lostOwnership();
+                break;
+            default:
+                // Not needed notifications
+                LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
+                    ownershipChange);
+        }
+    }
+
+    private void finishCloseIfNeeded() {
+        final SettableFuture<Void> future = closeFuture.get();
+        if (future != null) {
+            lock.lock();
+            try {
+                lockedClose(future);
+            } finally {
+                lock.unlock();
             }
-        } catch (final Exception e) {
-            LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
-            // TODO : think about close ... is it necessary?
         }
     }
 
@@ -265,56 +432,44 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      * Help method to registered DoubleCandidateEntity. It is first step
      * before the actual instance take Leadership.
      */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void tryToTakeOwnership() {
-        LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
+    private void takeOwnership() {
+        if (isClosed()) {
+            LOG.debug("Service group {} is closed, not taking ownership", identifier);
+            return;
+        }
+
+        LOG.debug("Group {} taking ownership", identifier);
+
+        updateState(State.TAKING_OWNERSHIP);
         try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            if (serviceEntityCandidateReg != null) {
-                Verify.verify(asyncCloseEntityCandidateReg == null);
-                asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
-            } else {
-                LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
-            }
-        } catch (final Exception e) {
-            LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
-                    clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-        } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+            cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+        } catch (CandidateAlreadyRegisteredException e) {
+            LOG.error("Service group {} failed to take ownership", identifier, e);
         }
     }
 
     /*
-     * Help method calls setupService method for create single cluster-wide service instance.
+     * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void takeOwnership() {
-        LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
-        try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            if (serviceEntityCandidateReg != null) {
-                Verify.verify(asyncCloseEntityCandidateReg != null);
-                for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
-                    service.instantiateServiceInstance();
-                }
-                hasOwnership = true;
-            } else {
-                LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
-            }
-        } catch (final RuntimeException | InterruptedException e) {
-            LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
-                    clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-        } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+    private void startServices() {
+        if (isClosed()) {
+            LOG.debug("Service group {} is closed, not starting services", identifier);
+            return;
         }
+
+        LOG.debug("Service group {} starting services", identifier);
+        serviceGroup.forEach(service -> {
+            LOG.debug("Starting service {}", service);
+            try {
+                service.instantiateServiceInstance();
+            } catch (Exception e) {
+                LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+            }
+        });
+
+        LOG.debug("Service group {} services started", identifier);
+        updateState(State.OWNER);
     }
 
     /*
@@ -322,110 +477,84 @@ final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends Generi
      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
      * new election for DoubleCandidateEntity.
      */
-    @SuppressWarnings("checkstyle:IllegalCatch")
     private void lostOwnership() {
-        LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
-        boolean needReleaseLock = false;
-        boolean needCloseProviderInstance = false;
-        try {
-            clusterLock.acquire();
-            needReleaseLock = true;
-            final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
-            if (hasOwnership) {
-                Verify.verify(asyncCloseEntityCandidateReg != null);
-                for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
-                    try {
-                        serviceCloseFutureList.add(service.closeServiceInstance());
-                    } catch (final RuntimeException e) {
-                        LOG.error("Unexpected exception while closing service: {}, resuming with next..",
-                                service.getIdentifier());
-                    }
-                }
-                hasOwnership = false;
-            }
-
-            final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
-            if (serviceEntityCandidateReg != null) {
-                // we don't want to remove this instance from map
-                Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
-            } else {
-                // we have to remove this ClusterSingletonServiceGroup instance from map
-                Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
-            }
-            /*
-             * We wish to stop all possible EOS activities before we don't close
-             * a close candidate registration that acts as a guard. So we don't want
-             * to release Semaphore (clusterLock) before we are not fully finished.
-             * Semaphore lock release has to be realized as FutureCallback after a service
-             * instance has fully closed prior to relinquishing service ownership.
-             */
-            needReleaseLock = false;
-        } catch (final InterruptedException e) {
-            LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
-                    clusterSingletonGroupIdentifier, e);
-            needCloseProviderInstance = true;
-        } finally {
-            closeResources(needReleaseLock, needCloseProviderInstance);
+        LOG.debug("Service group {} lost ownership in state {}", identifier, state);
+        switch (state) {
+            case REGISTERED:
+                updateState(State.STANDBY);
+                break;
+            case OWNER:
+                stopServices();
+                break;
+            case STARTING_SERVICES:
+            case STOPPING_SERVICES:
+                // No-op, as these will re-check state before proceeding
+                break;
+            case TAKING_OWNERSHIP:
+                cleanupEntityReg.close();
+                cleanupEntityReg = null;
+                updateState(State.STANDBY);
+                break;
+            case INITIAL:
+            case TERMINATED:
+            default:
+                LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
+                break;
         }
     }
 
-    /*
-     * Help method for finalization every acquired functionality
-     */
-    @GuardedBy("clusterLock")
-    private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
-        if (needCloseProvider) {
-            // The Game Over for this ClusterSingletonServiceGroup instance
-            if (serviceEntityCandidateReg != null) {
-                serviceEntityCandidateReg.close();
-                serviceEntityCandidateReg = null;
-            }
-            // Remove instance immediately because actual state is follower or initialization
-            if (asyncCloseEntityCandidateReg == null) {
-                allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
-            }
-        }
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    void stopServices() {
+        updateState(State.STOPPING_SERVICES);
 
-        if (needReleaseLock) {
-            clusterLock.release();
-        }
-    }
+        final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
+        for (final ClusterSingletonService service : serviceGroup) {
+            final ListenableFuture<Void> future;
 
-    /*
-     * Help method creates FutureCallback for suspend Future
-     */
-    private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
-            final boolean isInCloseProcess) {
-        final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
-            if (throwable != null) {
-                LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
-            } else {
-                LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
-            }
-            if (asyncCloseEntityCandidateReg != null) {
-                asyncCloseEntityCandidateReg.close();
-                asyncCloseEntityCandidateReg = null;
-            }
-            if (isInCloseProcess) {
-                allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
+            try {
+                future = service.closeServiceInstance();
+            } catch (Exception e) {
+                LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
+                    service, e);
+                continue;
             }
-            if (semaphore != null) {
-                semaphore.release();
-            }
-        };
 
-        return new FutureCallback<List<Void>>() {
+            serviceCloseFutureList.add(future);
+        }
 
+        Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
             @Override
-            public void onSuccess(final List<Void> result) {
-                closeEntityCandidateRegistration.accept(null);
+            public void onFailure(final Throwable cause) {
+                LOG.warn("Service group {} service stopping reported error", identifier, cause);
+                onServicesStopped();
             }
 
             @Override
-            public void onFailure(final Throwable throwable) {
-                closeEntityCandidateRegistration.accept(throwable);
+            public void onSuccess(final List<Void> nulls) {
+                onServicesStopped();
             }
-        };
+        });
     }
 
+    void onServicesStopped() {
+        LOG.debug("Service group {} finished stopping services", identifier);
+        lock.lock();
+        try {
+            if (cleanupEntityReg != null) {
+                updateState(State.RELEASING_OWNERSHIP);
+                cleanupEntityReg.close();
+                cleanupEntityReg = null;
+            } else {
+                updateState(State.STANDBY);
+            }
+        } finally {
+            lock.unlock();
+            finishCloseIfNeeded();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
+    }
 }