package org.opendaylight.mdsal.singleton.dom.impl;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
-import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.yangtools.concepts.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implementation of {@link ClusterSingletonServiceGroup}.
+ * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
+ * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
+ *
+ * <p>
+ * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
+ * a result on new candidates appearing. We use two entities:
+ * - service entity, to which all nodes register
+ * - cleanup entity, which only the service entity owner registers to
+ *
+ * <p>
+ * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
+ * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
+ * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
+ * granted until the old owner finishes the cleanup.
*
* @param <P> the instance identifier path type
* @param <E> the GenericEntity type
* @param <G> the GenericEntityOwnershipListener type
* @param <S> the GenericEntityOwnershipService type
*/
-@VisibleForTesting
final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
- C extends GenericEntityOwnershipChange<P, E>,
- G extends GenericEntityOwnershipListener<P, C>,
- S extends GenericEntityOwnershipService<P, E, G>>
- implements ClusterSingletonServiceGroup<P, E, C> {
+ C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
+ S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
+ private enum State {
+ /**
+ * This group has been freshly allocated and has not been started yet.
+ */
+ INITIAL,
+ /**
+ * Operational state. Service entity is registered, but ownership was not resolved yet.
+ */
+ REGISTERED,
+ /**
+ * Operational state. Service entity confirmed to be follower.
+ */
+ STANDBY,
+ /**
+ * Service entity acquired. Attempting to acquire cleanup entity.
+ */
+ TAKING_OWNERSHIP,
+ /**
+ * Both entities held and user services are being started.
+ */
+ STARTING_SERVICES,
+ /**
+ * Steady state. Both entities held and services have finished starting.
+ */
+ OWNER,
+ /**
+ * User services are being stopped due to either loss of an entity or a shutdown.
+ */
+ STOPPING_SERVICES,
+ /**
+ * We have stopped services and are now relinquishing the cleanup entity.
+ */
+ RELEASING_OWNERSHIP,
+ /**
+ * Terminated, this group cannot be used anymore.
+ */
+ TERMINATED
+ }
- private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
private final S entityOwnershipService;
- private final String clusterSingletonGroupIdentifier;
- private final Semaphore clusterLock = new Semaphore(1, true);
+ private final String identifier;
/* Entity instances */
private final E serviceEntity;
- private final E doubleCandidateEntity;
-
- // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
- // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
- // FOLLOWER : baseCandidate is registered correctly
- // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
- // LEADER : both candidate have mastership from EOS
- // TERMINATED : service go down
- @GuardedBy("clusterLock")
- private boolean hasOwnership = false;
- @GuardedBy("clusterLock")
- private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
- private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
+ private final E cleanupEntity;
+
+ private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
+ private final ReentrantLock lock = new ReentrantLock(true);
+
+ @GuardedBy("lock")
+ private final List<ClusterSingletonService> serviceGroup;
+
+ @GuardedBy("lock")
+ private State state = State.INITIAL;
+
+ @GuardedBy("lock")
+ private List<C> capture;
/* EOS Candidate Registrations */
- private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
- private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
+ @GuardedBy("lock")
+ private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
+ @GuardedBy("lock")
+ private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
/**
- * Class constructor.
+ * Class constructor. Note: last argument is reused as-is.
*
- * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
+ * @param identifier non-empty string as identifier
* @param mainEntity as Entity instance
* @param closeEntity as Entity instance
* @param entityOwnershipService GenericEntityOwnershipService instance
- * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
+ * @param parent parent service
+ * @param services Services list
*/
- ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
- final E closeEntity, final S entityOwnershipService,
- final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
- LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
- this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
+ ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
+ final E closeEntity, final List<ClusterSingletonService> services) {
+ Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+ this.identifier = identifier;
this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
this.serviceEntity = Preconditions.checkNotNull(mainEntity);
- this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
- this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
+ this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
+ this.serviceGroup = Preconditions.checkNotNull(services);
+ LOG.debug("Instantiated new service group for {}", identifier);
+ }
+
+ @VisibleForTesting
+ ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
+ final E closeEntity, final S entityOwnershipService) {
+ this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
- LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
- boolean needReleaseLock = false;
- final ListenableFuture<List<Void>> destroyFuture;
+ ListenableFuture<?> closeClusterSingletonGroup() {
+ // Assert our future first
+ final SettableFuture<Void> future = SettableFuture.create();
+ final SettableFuture<Void> existing = closeFuture.getAndSet(future);
+ if (existing != null) {
+ return existing;
+ }
+
+ if (!lock.tryLock()) {
+ // The lock is held, the cleanup will be finished by the owner thread
+ LOG.debug("Singleton group {} cleanup postponed", identifier);
+ return future;
+ }
+
try {
- needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
- } catch (final Exception e) {
- LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
- e);
+ lockedClose(future);
} finally {
- if (serviceEntityCandidateReg != null) {
- serviceEntityCandidateReg.close();
- serviceEntityCandidateReg = null;
- }
- final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
- if (hasOwnership) {
- for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- try {
- serviceCloseFutureList.add(service.closeServiceInstance());
- } catch (final RuntimeException e) {
- LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
- service.getIdentifier());
- }
- }
- hasOwnership = false;
- }
- destroyFuture = Futures.allAsList(serviceCloseFutureList);
- final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
- Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
+ lock.unlock();
}
- return destroyFuture;
+
+ LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
+ return future;
+ }
+
+ private boolean isClosed() {
+ return closeFuture.get() != null;
+ }
+
+ @GuardedBy("lock")
+ private void updateState(final State newState) {
+ LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
+ state = Verify.verifyNotNull(newState);
+ }
+
+ @GuardedBy("lock")
+ private void lockedClose(final SettableFuture<Void> future) {
+ if (serviceEntityReg != null) {
+ LOG.debug("Service group {} unregistering", identifier);
+ serviceEntityReg.close();
+ serviceEntityReg = null;
+ }
+
+ switch (state) {
+ case INITIAL:
+ // Not started: not much to do
+ terminate(future);
+ break;
+ case TERMINATED:
+ // Already done: no-op
+ break;
+ case REGISTERED:
+ case STANDBY:
+ LOG.debug("Service group {} terminated", identifier);
+ terminate(future);
+ break;
+ case OWNER:
+ // No-op, we will react to the loss of registration instead.
+ break;
+ case STOPPING_SERVICES:
+ // Waiting for services. Will resume once we get notified.
+ break;
+ case RELEASING_OWNERSHIP:
+ // Waiting for cleanup entity to flip, will resume afterwards.
+ break;
+ case TAKING_OWNERSHIP:
+ // Abort taking of ownership and close
+ LOG.debug("Service group {} aborting ownership bid", identifier);
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ updateState(State.RELEASING_OWNERSHIP);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + state);
+ }
+ }
+
+ @GuardedBy("lock")
+ private void terminate(final SettableFuture<Void> future) {
+ updateState(State.TERMINATED);
+ Verify.verify(future.set(null));
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public void initializationClusterSingletonGroup() {
- LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
+ void initialize() throws CandidateAlreadyRegisteredException {
+ LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
+
+ lock.lock();
try {
- clusterLock.acquire();
- needReleaseLock = true;
- Verify.verify(serviceGroup.isEmpty());
- Verify.verify(!hasOwnership);
- Verify.verify(serviceEntityCandidateReg == null);
- serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
- } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
- LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- throw new RuntimeException(e);
+ Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
+ state);
+
+ // Catch events if they fire during this call
+ capture = new ArrayList<>(0);
+ serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+ state = State.REGISTERED;
+
+ final List<C> captured = capture;
+ capture = null;
+ captured.forEach(this::lockedOwnershipChanged);
} finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ lock.unlock();
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
+ private void checkNotClosed() {
+ Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
+ identifier);
+ }
+
@Override
- public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
- LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
- Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
- ClusterSingletonServiceRegistrationDelegator reg = null;
+ void registerService(final ClusterSingletonService service) {
+ Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ checkNotClosed();
+
+ LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
+
+ lock.lock();
try {
- clusterLock.acquire();
- needReleaseLock = true;
- Verify.verify(serviceEntityCandidateReg != null);
- reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
- serviceGroup.add(reg);
- if (hasOwnership) {
- service.instantiateServiceInstance();
+ Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
+ serviceGroup.add(service);
+
+ switch (state) {
+ case OWNER:
+ case STARTING_SERVICES:
+ service.instantiateServiceInstance();
+ break;
+ default:
+ break;
}
- } catch (final RuntimeException | InterruptedException e) {
- LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- throw new RuntimeException(e);
} finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ lock.unlock();
}
- return reg;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
+ @CheckReturnValue
@Override
- public void unregisterService(final ClusterSingletonService service) {
- LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
- Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
+ boolean unregisterService(final ClusterSingletonService service) {
+ Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ checkNotClosed();
+
+ lock.lock();
try {
- clusterLock.acquire();
- needReleaseLock = true;
- if (serviceGroup.size() > 1) {
- if (hasOwnership) {
+ // There is a slight problem here, as the type does not match the list type, hence we need to tread
+ // carefully.
+ if (serviceGroup.size() == 1) {
+ Verify.verify(serviceGroup.contains(service));
+ return true;
+ }
+
+ Verify.verify(serviceGroup.remove(service));
+ LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+
+ switch (state) {
+ case OWNER:
+ case STARTING_SERVICES:
service.closeServiceInstance();
- }
- serviceGroup.remove(service);
- LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
- } else {
- needCloseProviderInstance = true;
+ break;
+ default:
+ break;
}
- } catch (final RuntimeException | InterruptedException e) {
- LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- throw new RuntimeException(e);
+
+ return false;
} finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ lock.unlock();
+ finishCloseIfNeeded();
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public void ownershipChanged(final C ownershipChange) {
- LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
- clusterSingletonGroupIdentifier);
+ void ownershipChanged(final C ownershipChange) {
+ LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
+
+ lock.lock();
try {
- if (ownershipChange.inJeopardy()) {
- LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
- lostOwnership();
- return;
+ if (capture != null) {
+ capture.add(ownershipChange);
+ } else {
+ lockedOwnershipChanged(ownershipChange);
}
- if (serviceEntity.equals(ownershipChange.getEntity())) {
- if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
- /*
- * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
- */
- tryToTakeOwnership();
- } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
- || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
- .equals(ownershipChange.getState())) {
- /*
- * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
- */
- lostOwnership();
- } else {
- /* Not needed notifications */
- LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
- clusterSingletonGroupIdentifier);
+ } finally {
+ lock.unlock();
+ finishCloseIfNeeded();
+ }
+ }
+
+ private void lockedOwnershipChanged(final C ownershipChange) {
+ if (ownershipChange.inJeopardy()) {
+ LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
+ lostOwnership();
+ return;
+ }
+
+ final E entity = ownershipChange.getEntity();
+ if (serviceEntity.equals(entity)) {
+ serviceOwnershipChanged(ownershipChange);
+ } else if (cleanupEntity.equals(entity)) {
+ cleanupCandidateOwnershipChanged(ownershipChange);
+ } else {
+ LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
+ }
+ }
+
+ private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
+ switch (ownershipChange.getState()) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ switch (state) {
+ case TAKING_OWNERSHIP:
+ // SLAVE to MASTER
+ startServices();
+ return;
+ default:
+ break;
}
- } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
- if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
- /*
- * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
- */
- takeOwnership();
- } else {
- /* Not needed notifications */
- LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
- ownershipChange, clusterSingletonGroupIdentifier);
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ switch (state) {
+ case RELEASING_OWNERSHIP:
+ // Slight cheat: if we are closing down, we just need to notify the future
+ updateState(isClosed() ? State.INITIAL : State.STANDBY);
+ return;
+ case STARTING_SERVICES:
+ case OWNER:
+ case TAKING_OWNERSHIP:
+ LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
+ return;
+ default:
+ break;
}
- } else {
- LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
+
+ break;
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ default:
+ break;
+ }
+
+ LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
+ }
+
+ private void serviceOwnershipChanged(final C ownershipChange) {
+ switch (ownershipChange.getState()) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
+ takeOwnership();
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
+ lostOwnership();
+ break;
+ default:
+ // Not needed notifications
+ LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
+ ownershipChange);
+ }
+ }
+
+ private void finishCloseIfNeeded() {
+ final SettableFuture<Void> future = closeFuture.get();
+ if (future != null) {
+ lock.lock();
+ try {
+ lockedClose(future);
+ } finally {
+ lock.unlock();
}
- } catch (final Exception e) {
- LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
- // TODO : think about close ... is it necessary?
}
}
* Help method to registered DoubleCandidateEntity. It is first step
* before the actual instance take Leadership.
*/
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void tryToTakeOwnership() {
- LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
+ private void takeOwnership() {
+ if (isClosed()) {
+ LOG.debug("Service group {} is closed, not taking ownership", identifier);
+ return;
+ }
+
+ LOG.debug("Group {} taking ownership", identifier);
+
+ updateState(State.TAKING_OWNERSHIP);
try {
- clusterLock.acquire();
- needReleaseLock = true;
- if (serviceEntityCandidateReg != null) {
- Verify.verify(asyncCloseEntityCandidateReg == null);
- asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
- } else {
- LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
- }
- } catch (final Exception e) {
- LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
- clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- } finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Service group {} failed to take ownership", identifier, e);
}
}
/*
- * Help method calls setupService method for create single cluster-wide service instance.
+ * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- private void takeOwnership() {
- LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
- try {
- clusterLock.acquire();
- needReleaseLock = true;
- if (serviceEntityCandidateReg != null) {
- Verify.verify(asyncCloseEntityCandidateReg != null);
- for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- service.instantiateServiceInstance();
- }
- hasOwnership = true;
- } else {
- LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
- }
- } catch (final RuntimeException | InterruptedException e) {
- LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
- clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- } finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ private void startServices() {
+ if (isClosed()) {
+ LOG.debug("Service group {} is closed, not starting services", identifier);
+ return;
}
+
+ LOG.debug("Service group {} starting services", identifier);
+ serviceGroup.forEach(service -> {
+ LOG.debug("Starting service {}", service);
+ try {
+ service.instantiateServiceInstance();
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+ }
+ });
+
+ LOG.debug("Service group {} services started", identifier);
+ updateState(State.OWNER);
}
/*
* The last async. step has to close DoubleCandidateRegistration reference what should initialize
* new election for DoubleCandidateEntity.
*/
- @SuppressWarnings("checkstyle:IllegalCatch")
private void lostOwnership() {
- LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
- boolean needReleaseLock = false;
- boolean needCloseProviderInstance = false;
- try {
- clusterLock.acquire();
- needReleaseLock = true;
- final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
- if (hasOwnership) {
- Verify.verify(asyncCloseEntityCandidateReg != null);
- for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- try {
- serviceCloseFutureList.add(service.closeServiceInstance());
- } catch (final RuntimeException e) {
- LOG.error("Unexpected exception while closing service: {}, resuming with next..",
- service.getIdentifier());
- }
- }
- hasOwnership = false;
- }
-
- final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
- if (serviceEntityCandidateReg != null) {
- // we don't want to remove this instance from map
- Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
- } else {
- // we have to remove this ClusterSingletonServiceGroup instance from map
- Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
- }
- /*
- * We wish to stop all possible EOS activities before we don't close
- * a close candidate registration that acts as a guard. So we don't want
- * to release Semaphore (clusterLock) before we are not fully finished.
- * Semaphore lock release has to be realized as FutureCallback after a service
- * instance has fully closed prior to relinquishing service ownership.
- */
- needReleaseLock = false;
- } catch (final InterruptedException e) {
- LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
- clusterSingletonGroupIdentifier, e);
- needCloseProviderInstance = true;
- } finally {
- closeResources(needReleaseLock, needCloseProviderInstance);
+ LOG.debug("Service group {} lost ownership in state {}", identifier, state);
+ switch (state) {
+ case REGISTERED:
+ updateState(State.STANDBY);
+ break;
+ case OWNER:
+ stopServices();
+ break;
+ case STARTING_SERVICES:
+ case STOPPING_SERVICES:
+ // No-op, as these will re-check state before proceeding
+ break;
+ case TAKING_OWNERSHIP:
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ updateState(State.STANDBY);
+ break;
+ case INITIAL:
+ case TERMINATED:
+ default:
+ LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
+ break;
}
}
- /*
- * Help method for finalization every acquired functionality
- */
- @GuardedBy("clusterLock")
- private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
- if (needCloseProvider) {
- // The Game Over for this ClusterSingletonServiceGroup instance
- if (serviceEntityCandidateReg != null) {
- serviceEntityCandidateReg.close();
- serviceEntityCandidateReg = null;
- }
- // Remove instance immediately because actual state is follower or initialization
- if (asyncCloseEntityCandidateReg == null) {
- allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
- }
- }
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ void stopServices() {
+ updateState(State.STOPPING_SERVICES);
- if (needReleaseLock) {
- clusterLock.release();
- }
- }
+ final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
+ for (final ClusterSingletonService service : serviceGroup) {
+ final ListenableFuture<Void> future;
- /*
- * Help method creates FutureCallback for suspend Future
- */
- private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
- final boolean isInCloseProcess) {
- final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
- if (throwable != null) {
- LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
- } else {
- LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
- }
- if (asyncCloseEntityCandidateReg != null) {
- asyncCloseEntityCandidateReg.close();
- asyncCloseEntityCandidateReg = null;
- }
- if (isInCloseProcess) {
- allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
+ try {
+ future = service.closeServiceInstance();
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
+ service, e);
+ continue;
}
- if (semaphore != null) {
- semaphore.release();
- }
- };
- return new FutureCallback<List<Void>>() {
+ serviceCloseFutureList.add(future);
+ }
+ Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
@Override
- public void onSuccess(final List<Void> result) {
- closeEntityCandidateRegistration.accept(null);
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Service group {} service stopping reported error", identifier, cause);
+ onServicesStopped();
}
@Override
- public void onFailure(final Throwable throwable) {
- closeEntityCandidateRegistration.accept(throwable);
+ public void onSuccess(final List<Void> nulls) {
+ onServicesStopped();
}
- };
+ });
}
+ void onServicesStopped() {
+ LOG.debug("Service group {} finished stopping services", identifier);
+ lock.lock();
+ try {
+ if (cleanupEntityReg != null) {
+ updateState(State.RELEASING_OWNERSHIP);
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ } else {
+ updateState(State.STANDBY);
+ }
+ } finally {
+ lock.unlock();
+ finishCloseIfNeeded();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
+ }
}