import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
* @param <G> the GenericEntityOwnershipListener type
* @param <S> the GenericEntityOwnershipService type
*/
+@ThreadSafe
final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
- private enum State {
+
+ private enum EntityState {
/**
- * This group has been freshly allocated and has not been started yet.
+ * This entity was never registered.
*/
- INITIAL,
+ UNREGISTERED,
/**
- * Operational state. Service entity is registered, but ownership was not resolved yet.
+ * Registration exists, but we are waiting for it to resolve.
*/
REGISTERED,
/**
- * Operational state. Service entity confirmed to be follower.
- */
- STANDBY,
- /**
- * Service entity acquired. Attempting to acquire cleanup entity.
+ * Registration indicated we are the owner.
*/
- TAKING_OWNERSHIP,
+ OWNED,
/**
- * Both entities held and user services are being started.
+ * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
+ * another partition, for example.
*/
- STARTING_SERVICES,
+ OWNED_JEOPARDY,
/**
- * Steady state. Both entities held and services have finished starting.
+ * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
+ * do not need an UNOWNED_JEOPARDY state.
*/
- OWNER,
+ UNOWNED,
+ }
+
+ private enum ServiceState {
/**
- * User services are being stopped due to either loss of an entity or a shutdown.
+ * Local services are stopped.
*/
- STOPPING_SERVICES,
+ STOPPED,
/**
- * We have stopped services and are now relinquishing the cleanup entity.
+ * Local services are up and running.
*/
- RELEASING_OWNERSHIP,
+ // FIXME: we should support async startup, which will require a STARTING state.
+ STARTED,
/**
- * Terminated, this group cannot be used anymore.
+ * Local services are being stopped.
*/
- TERMINATED
+ STOPPING,
}
private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
private final E serviceEntity;
private final E cleanupEntity;
- private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
private final ReentrantLock lock = new ReentrantLock(true);
@GuardedBy("lock")
private final List<ClusterSingletonService> serviceGroup;
+ /*
+ * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
+ * - user calling close()
+ * - service entity ownership
+ * - cleanup entity ownership
+ * - service shutdown future
+ *
+ * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
+ * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
+ * due to boilerplate overhead.
+ *
+ * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
+ * based on recorded bits -- without explicit representation of state machine state.
+ */
+ /**
+ * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
+ * the user has closed the group and we are converging to termination.
+ */
+ // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
+ // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
+ // XXX: above needs a microbenchmark contention ever becomes a problem.
+ private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
+
+ /**
+ * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
+ * acquire {@link #cleanupEntity}.
+ */
@GuardedBy("lock")
- private State state = State.INITIAL;
+ private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
+ /**
+ * Service (base) entity last reported state.
+ */
+ @GuardedBy("lock")
+ private EntityState serviceEntityState = EntityState.UNREGISTERED;
+ /**
+ * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
+ * and startup.
+ */
@GuardedBy("lock")
- private List<C> capture;
+ private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
+ /**
+ * Cleanup (owner) entity last reported state.
+ */
+ @GuardedBy("lock")
+ private EntityState cleanupEntityState = EntityState.UNREGISTERED;
- /* EOS Candidate Registrations */
+ /**
+ * Optional event capture list. This field is initialized when we interact with entity ownership service, to capture
+ * events reported during EOS method invocation -- like immediate acquisition of entity when we register it. This
+ * prevents bugs from recursion.
+ */
@GuardedBy("lock")
- private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
+ private List<C> capture = null;
+
+ /**
+ * State of local services.
+ */
@GuardedBy("lock")
- private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
+ private ServiceState localServicesState = ServiceState.STOPPED;
/**
* Class constructor. Note: last argument is reused as-is.
}
@GuardedBy("lock")
- private void updateState(final State newState) {
- LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
- state = Verify.verifyNotNull(newState);
+ private void startCapture() {
+ Verify.verify(capture == null, "Service group {} is already capturing events {}", identifier, capture);
+ capture = new ArrayList<>(0);
+ LOG.debug("Service group {} started capturing events", identifier);
+ }
+
+ private List<C> endCapture() {
+ final List<C> ret = Verify.verifyNotNull(capture, "Service group {} is not currently capturing", identifier);
+ capture = null;
+ LOG.debug("Service group {} finished capturing events, {} events to process", identifier, ret.size());
+ return ret;
}
@GuardedBy("lock")
private void lockedClose(final SettableFuture<Void> future) {
if (serviceEntityReg != null) {
- LOG.debug("Service group {} unregistering", identifier);
+ // We are still holding the service registration, close it now...
+ LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
+ startCapture();
serviceEntityReg.close();
serviceEntityReg = null;
+
+ // This can potentially mutate our state, so all previous checks need to be re-validated.
+ endCapture().forEach(this::lockedOwnershipChanged);
}
- switch (state) {
- case INITIAL:
- // Not started: not much to do
- terminate(future);
- break;
- case TERMINATED:
- // Already done: no-op
- break;
+ // Now check service entity state: if it is still owned, we need to wait until it is acknowledged as
+ // unregistered.
+ switch (serviceEntityState) {
case REGISTERED:
- case STANDBY:
- LOG.debug("Service group {} terminated", identifier);
- terminate(future);
- break;
- case OWNER:
- // No-op, we will react to the loss of registration instead.
- break;
- case STOPPING_SERVICES:
- // Waiting for services. Will resume once we get notified.
+ case UNOWNED:
+ case UNREGISTERED:
+ // We have either successfully shut down, or have never started up, proceed with termination
break;
- case RELEASING_OWNERSHIP:
- // Waiting for cleanup entity to flip, will resume afterwards.
- break;
- case TAKING_OWNERSHIP:
- // Abort taking of ownership and close
- LOG.debug("Service group {} aborting ownership bid", identifier);
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- updateState(State.RELEASING_OWNERSHIP);
+ case OWNED:
+ // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
+ // when that loss is reported.
+ LOG.debug("Service group {} is still owned, postponing termination", identifier);
+ return;
+ case OWNED_JEOPARDY:
+ // This is a significant event, as it relates to cluster split/join operations, operators need to know
+ // we are waiting for cluster join event.
+ LOG.info("Service group {} is still owned with split cluster, postponing termination", identifier);
+ return;
+ default:
+ throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+ }
+
+ // We do not own service entity state: we need to ensure services are stopped.
+ if (stopServices()) {
+ LOG.debug("Service group {} started shutting down services, postponing termination", identifier);
+ return;
+ }
+
+ // Local cleanup completed, release cleanup entity if needed
+ if (cleanupEntityReg != null) {
+ LOG.debug("Service group {} unregistering cleanup entity {}", identifier, cleanupEntity);
+ startCapture();
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+
+ // This can potentially mutate our state, so all previous checks need to be re-validated.
+ endCapture().forEach(this::lockedOwnershipChanged);
+ }
+
+ switch (cleanupEntityState) {
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ // We have either successfully shut down, or have never started up, proceed with termination
break;
+ case OWNED:
+ // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
+ // when that loss is reported.
+ LOG.debug("Service group {} is still owns cleanup, postponing termination", identifier);
+ return;
+ case OWNED_JEOPARDY:
+ // This is a significant event, as it relates to cluster split/join operations, operators need to know
+ // we are waiting for cluster join event.
+ LOG.info("Service group {} is still owns cleanup with split cluster, postponing termination",
+ identifier);
+ return;
default:
- throw new IllegalStateException("Unhandled state " + state);
+ throw new IllegalStateException("Unhandled cleanup entity state " + serviceEntityState);
}
- }
- @GuardedBy("lock")
- private void terminate(final SettableFuture<Void> future) {
- updateState(State.TERMINATED);
- Verify.verify(future.set(null));
+ // No registrations left and no service operations pending, we are done
+ LOG.debug("Service group {} completing termination", identifier);
+ future.set(null);
}
@Override
void initialize() throws CandidateAlreadyRegisteredException {
- LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
-
lock.lock();
try {
- Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
- state);
+ Preconditions.checkState(serviceEntityState == EntityState.UNREGISTERED,
+ "Singleton group %s was already initilized", identifier);
- // Catch events if they fire during this call
- capture = new ArrayList<>(0);
+ LOG.debug("Initializing service group {} with services {}", identifier, serviceGroup);
+ startCapture();
serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
- state = State.REGISTERED;
-
- final List<C> captured = capture;
- capture = null;
- captured.forEach(this::lockedOwnershipChanged);
+ serviceEntityState = EntityState.REGISTERED;
+ endCapture().forEach(this::lockedOwnershipChanged);
} finally {
lock.unlock();
}
Verify.verify(identifier.equals(service.getIdentifier().getValue()));
checkNotClosed();
- LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
-
lock.lock();
try {
- Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
+ Preconditions.checkState(serviceEntityState != EntityState.UNREGISTERED,
+ "Service group %s is not initialized yet", identifier);
+
+ LOG.debug("Adding service {} to service group {}", service, identifier);
serviceGroup.add(service);
- switch (state) {
- case OWNER:
- case STARTING_SERVICES:
+ switch (localServicesState) {
+ case STARTED:
+ LOG.debug("Service group {} starting late-registered service {}", identifier, service);
service.instantiateServiceInstance();
break;
- default:
+ case STOPPED:
+ case STOPPING:
break;
+ default:
+ throw new IllegalStateException("Unhandled local services state " + localServicesState);
}
} finally {
lock.unlock();
+ finishCloseIfNeeded();
}
}
Verify.verify(serviceGroup.remove(service));
LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
- switch (state) {
- case OWNER:
- case STARTING_SERVICES:
+ switch (localServicesState) {
+ case STARTED:
+ LOG.warn("Service group {} stopping unregistered service {}", identifier, service);
service.closeServiceInstance();
break;
- default:
+ case STOPPED:
+ case STOPPING:
break;
+ default:
+ throw new IllegalStateException("Unhandled local services state " + localServicesState);
}
return false;
}
}
+ /**
+ * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
+ * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
+ *
+ * @param ownershipChange reported change
+ */
+ @GuardedBy("lock")
private void lockedOwnershipChanged(final C ownershipChange) {
- if (ownershipChange.inJeopardy()) {
- LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
- lostOwnership();
- return;
- }
-
final E entity = ownershipChange.getEntity();
if (serviceEntity.equals(entity)) {
- serviceOwnershipChanged(ownershipChange);
+ serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
} else if (cleanupEntity.equals(entity)) {
- cleanupCandidateOwnershipChanged(ownershipChange);
+ cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
} else {
LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
}
}
- private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
- switch (ownershipChange.getState()) {
+ private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+ if (jeopardy) {
+ switch (state) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ if (cleanupEntityReg == null) {
+ LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
+ return;
+ }
+
+ LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
+ cleanupEntityState = EntityState.OWNED_JEOPARDY;
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
+ cleanupEntityState = EntityState.UNOWNED;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+ }
+
+ stopServices();
+ return;
+ }
+
+ if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
+ // Pair info message with previous jeopardy
+ LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
+ }
+
+ switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
- switch (state) {
- case TAKING_OWNERSHIP:
- // SLAVE to MASTER
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ if (cleanupEntityReg == null) {
+ LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
+ return;
+ }
+
+ cleanupEntityState = EntityState.OWNED;
+ switch (localServicesState) {
+ case STARTED:
+ LOG.debug("Service group {} already has local services running", identifier);
+ break;
+ case STOPPED:
startServices();
- return;
- default:
break;
+ case STOPPING:
+ LOG.debug("Service group {} has local services stopping, postponing startup", identifier);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled local services state " + localServicesState);
}
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- switch (state) {
- case RELEASING_OWNERSHIP:
- // Slight cheat: if we are closing down, we just need to notify the future
- updateState(isClosed() ? State.INITIAL : State.STANDBY);
- return;
- case STARTING_SERVICES:
- case OWNER:
- case TAKING_OWNERSHIP:
- LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
- return;
- default:
- break;
- }
-
+ cleanupEntityState = EntityState.UNOWNED;
+ stopServices();
break;
- case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- case REMOTE_OWNERSHIP_CHANGED:
case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ cleanupEntityState = EntityState.UNOWNED;
+ break;
default:
+ LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
break;
}
-
- LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
}
- private void serviceOwnershipChanged(final C ownershipChange) {
- switch (ownershipChange.getState()) {
+ private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+ if (jeopardy) {
+ LOG.info("Service group {} service entity ownership uncertain", identifier);
+
+ // Service entity ownership is uncertain, which means we want to record the state, but we do not want
+ // to stop local services nor do anything with the cleanup entity.
+ switch (state) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ if (serviceEntityReg == null) {
+ LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
+ return;
+ }
+
+ serviceEntityState = EntityState.OWNED_JEOPARDY;
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ serviceEntityState = EntityState.UNOWNED;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+ }
+ return;
+ }
+
+ if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
+ // Pair info message with previous jeopardy
+ LOG.info("Service group {} service entity ownership ascertained", identifier);
+ }
+
+ switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
- // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ if (serviceEntityReg == null) {
+ LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
+ return;
+ }
+
+ serviceEntityState = EntityState.OWNED;
takeOwnership();
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
- lostOwnership();
+ LOG.debug("Service group {} lost service entity ownership", identifier);
+ serviceEntityState = EntityState.UNOWNED;
+ if (stopServices()) {
+ LOG.debug("Service group {} already stopping services, postponing cleanup", identifier);
+ return;
+ }
+
+ if (cleanupEntityReg != null) {
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ }
+ break;
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ // No need to react, just update the state
+ serviceEntityState = EntityState.UNOWNED;
break;
default:
- // Not needed notifications
- LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
- ownershipChange);
+ LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
+ break;
}
}
*/
private void takeOwnership() {
if (isClosed()) {
- LOG.debug("Service group {} is closed, not taking ownership", identifier);
+ LOG.debug("Service group {} is closed, skipping cleanup ownership bid", identifier);
return;
}
- LOG.debug("Group {} taking ownership", identifier);
+ LOG.debug("Service group {} registering cleanup entity", identifier);
- updateState(State.TAKING_OWNERSHIP);
+ startCapture();
try {
cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+ cleanupEntityState = EntityState.REGISTERED;
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Service group {} failed to take ownership", identifier, e);
}
+
+ endCapture().forEach(this::lockedOwnershipChanged);
}
/*
}
});
+ localServicesState = ServiceState.STARTED;
LOG.debug("Service group {} services started", identifier);
- updateState(State.OWNER);
}
- /*
- * Help method calls suspendService method for stop this single cluster-wide service instance.
- * The last async. step has to close DoubleCandidateRegistration reference what should initialize
- * new election for DoubleCandidateEntity.
- */
- private void lostOwnership() {
- LOG.debug("Service group {} lost ownership in state {}", identifier, state);
- switch (state) {
- case REGISTERED:
- updateState(State.STANDBY);
- break;
- case OWNER:
- stopServices();
- break;
- case STARTING_SERVICES:
- case STOPPING_SERVICES:
- // No-op, as these will re-check state before proceeding
- break;
- case TAKING_OWNERSHIP:
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- updateState(State.STANDBY);
- break;
- case INITIAL:
- case TERMINATED:
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ boolean stopServices() {
+ switch (localServicesState) {
+ case STARTED:
+ localServicesState = ServiceState.STOPPING;
+
+ final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
+ for (final ClusterSingletonService service : serviceGroup) {
+ final ListenableFuture<Void> future;
+
+ try {
+ future = service.closeServiceInstance();
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
+ service, e);
+ continue;
+ }
+
+ serviceCloseFutureList.add(future);
+ }
+
+ LOG.debug("Service group {} initiated service shutdown", identifier);
+
+ Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Service group {} service stopping reported error", identifier, cause);
+ onServicesStopped();
+ }
+
+ @Override
+ public void onSuccess(final List<Void> nulls) {
+ onServicesStopped();
+ }
+ }, MoreExecutors.directExecutor());
+
+ return localServicesState == ServiceState.STOPPING;
+ case STOPPED:
+ LOG.debug("Service group {} has already stopped services", identifier);
+ return false;
+ case STOPPING:
+ LOG.debug("Service group {} is already stopping services", identifier);
+ return true;
default:
- LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
- break;
+ throw new IllegalStateException("Unhandled local services state " + localServicesState);
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- void stopServices() {
- updateState(State.STOPPING_SERVICES);
-
- final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
- for (final ClusterSingletonService service : serviceGroup) {
- final ListenableFuture<Void> future;
+ void onServicesStopped() {
+ LOG.debug("Service group {} finished stopping services", identifier);
+ lock.lock();
+ try {
+ localServicesState = ServiceState.STOPPED;
- try {
- future = service.closeServiceInstance();
- } catch (Exception e) {
- LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
- service, e);
- continue;
+ if (isClosed()) {
+ LOG.debug("Service group {} closed, skipping service restart check", identifier);
+ return;
}
- serviceCloseFutureList.add(future);
- }
-
- Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
- @Override
- public void onFailure(final Throwable cause) {
- LOG.warn("Service group {} service stopping reported error", identifier, cause);
- onServicesStopped();
+ // If we lost the service entity while services were stopping, we need to unregister cleanup entity
+ switch (serviceEntityState) {
+ case OWNED:
+ case OWNED_JEOPARDY:
+ // No need to churn cleanup entity
+ break;
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ if (cleanupEntityReg != null) {
+ startCapture();
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ endCapture().forEach(this::lockedOwnershipChanged);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unhandled service entity state" + serviceEntityState);
}
- @Override
- public void onSuccess(final List<Void> nulls) {
- onServicesStopped();
+ if (cleanupEntityReg == null) {
+ LOG.debug("Service group {} does not have cleanup entity registered, skipping restart check",
+ identifier);
+ return;
}
- }, MoreExecutors.directExecutor());
- }
- void onServicesStopped() {
- LOG.debug("Service group {} finished stopping services", identifier);
- lock.lock();
- try {
- if (cleanupEntityReg != null) {
- updateState(State.RELEASING_OWNERSHIP);
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- } else {
- updateState(State.STANDBY);
+ // Double-check if the services should really be down
+ switch (cleanupEntityState) {
+ case OWNED:
+ // We have finished stopping services, but we own cleanup, e.g. we should start them again.
+ startServices();
+ return;
+ case UNOWNED:
+ case OWNED_JEOPARDY:
+ case REGISTERED:
+ case UNREGISTERED:
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity state" + cleanupEntityState);
}
} finally {
lock.unlock();
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
+ return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();
}
}