* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.mdsal.singleton.dom.impl;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.CheckReturnValue;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
+ * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
* in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
*
* <p>
* @param <G> the GenericEntityOwnershipListener type
* @param <S> the GenericEntityOwnershipService type
*/
-final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
+final class ClusterSingletonServiceGroupImpl<P extends HierarchicalIdentifier<P>, E extends GenericEntity<P>,
C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
- private enum State {
+
+ private enum EntityState {
/**
- * This group has been freshly allocated and has not been started yet.
+ * This entity was never registered.
*/
- INITIAL,
+ UNREGISTERED,
/**
- * Operational state. Service entity is registered, but ownership was not resolved yet.
+ * Registration exists, but we are waiting for it to resolve.
*/
REGISTERED,
/**
- * Operational state. Service entity confirmed to be follower.
- */
- STANDBY,
- /**
- * Service entity acquired. Attempting to acquire cleanup entity.
- */
- TAKING_OWNERSHIP,
- /**
- * Both entities held and user services are being started.
+ * Registration indicated we are the owner.
*/
- STARTING_SERVICES,
+ OWNED,
/**
- * Steady state. Both entities held and services have finished starting.
+ * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
+ * another partition, for example.
*/
- OWNER,
+ OWNED_JEOPARDY,
/**
- * User services are being stopped due to either loss of an entity or a shutdown.
+ * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
+ * do not need an UNOWNED_JEOPARDY state.
*/
- STOPPING_SERVICES,
+ UNOWNED,
+ }
+
+ enum ServiceState {
/**
- * We have stopped services and are now relinquishing the cleanup entity.
+ * Local service is up and running.
*/
- RELEASING_OWNERSHIP,
+ // FIXME: we should support async startup, which will require a STARTING state.
+ STARTED,
/**
- * Terminated, this group cannot be used anymore.
+ * Local service is being stopped.
*/
- TERMINATED
+ STOPPING,
}
private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
private final E serviceEntity;
private final E cleanupEntity;
- private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
- private final ReentrantLock lock = new ReentrantLock(true);
+ private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
+ // Guarded by lock
+ private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
- @GuardedBy("lock")
- private final List<ClusterSingletonService> serviceGroup;
+ // Marker for when any state changed
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
+ private volatile int dirty;
- @GuardedBy("lock")
- private State state = State.INITIAL;
+ // Simplified lock: non-reentrant, support tryLock() only
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
+ @SuppressWarnings("unused")
+ private volatile int lock;
- @GuardedBy("lock")
- private List<C> capture;
+ /*
+ * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
+ * - user calling close()
+ * - service entity ownership
+ * - cleanup entity ownership
+ * - service shutdown future
+ *
+ * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
+ * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
+ * due to boilerplate overhead.
+ *
+ * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
+ * based on recorded bits -- without explicit representation of state machine state.
+ */
+ /**
+ * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
+ * the user has closed the group and we are converging to termination.
+ */
+ // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
+ // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
+ // XXX: above needs a microbenchmark contention ever becomes a problem.
+ private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
+
+ /**
+ * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
+ * acquire {@link #cleanupEntity}.
+ */
+ @GuardedBy("this")
+ private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
+ /**
+ * Service (base) entity last reported state.
+ */
+ @GuardedBy("this")
+ private EntityState serviceEntityState = EntityState.UNREGISTERED;
- /* EOS Candidate Registrations */
- @GuardedBy("lock")
- private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
- @GuardedBy("lock")
+ /**
+ * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
+ * and startup.
+ */
+ @GuardedBy("this")
private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
+ /**
+ * Cleanup (owner) entity last reported state.
+ */
+ @GuardedBy("this")
+ private EntityState cleanupEntityState = EntityState.UNREGISTERED;
+
+ private volatile boolean initialized;
/**
* Class constructor. Note: last argument is reused as-is.
* @param services Services list
*/
ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
- final E closeEntity, final List<ClusterSingletonService> services) {
- Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+ final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
+ checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
this.identifier = identifier;
- this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
- this.serviceEntity = Preconditions.checkNotNull(mainEntity);
- this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
- this.serviceGroup = Preconditions.checkNotNull(services);
+ this.entityOwnershipService = requireNonNull(entityOwnershipService);
+ this.serviceEntity = requireNonNull(mainEntity);
+ this.cleanupEntity = requireNonNull(closeEntity);
+ members.addAll(services);
+
LOG.debug("Instantiated new service group for {}", identifier);
}
@VisibleForTesting
ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
final E closeEntity, final S entityOwnershipService) {
- this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+ this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
}
@Override
@Override
ListenableFuture<?> closeClusterSingletonGroup() {
- // Assert our future first
- final SettableFuture<Void> future = SettableFuture.create();
- final SettableFuture<Void> existing = closeFuture.getAndSet(future);
- if (existing != null) {
- return existing;
- }
-
- if (!lock.tryLock()) {
- // The lock is held, the cleanup will be finished by the owner thread
- LOG.debug("Singleton group {} cleanup postponed", identifier);
- return future;
- }
+ final ListenableFuture<?> ret = destroyGroup();
+ members.clear();
+ markDirty();
- try {
- lockedClose(future);
- } finally {
- lock.unlock();
+ if (tryLock()) {
+ reconcileState();
+ } else {
+ LOG.debug("Service group {} postponing sync on close", identifier);
}
- LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
- return future;
+ return ret;
}
private boolean isClosed() {
return closeFuture.get() != null;
}
- @GuardedBy("lock")
- private void updateState(final State newState) {
- LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
- state = Verify.verifyNotNull(newState);
- }
-
- @GuardedBy("lock")
- private void lockedClose(final SettableFuture<Void> future) {
- if (serviceEntityReg != null) {
- LOG.debug("Service group {} unregistering", identifier);
- serviceEntityReg.close();
- serviceEntityReg = null;
- }
-
- switch (state) {
- case INITIAL:
- // Not started: not much to do
- terminate(future);
- break;
- case TERMINATED:
- // Already done: no-op
- break;
- case REGISTERED:
- case STANDBY:
- LOG.debug("Service group {} terminated", identifier);
- terminate(future);
- break;
- case OWNER:
- // No-op, we will react to the loss of registration instead.
- break;
- case STOPPING_SERVICES:
- // Waiting for services. Will resume once we get notified.
- break;
- case RELEASING_OWNERSHIP:
- // Waiting for cleanup entity to flip, will resume afterwards.
- break;
- case TAKING_OWNERSHIP:
- // Abort taking of ownership and close
- LOG.debug("Service group {} aborting ownership bid", identifier);
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- updateState(State.RELEASING_OWNERSHIP);
- break;
- default:
- throw new IllegalStateException("Unhandled state " + state);
- }
- }
-
- @GuardedBy("lock")
- private void terminate(final SettableFuture<Void> future) {
- updateState(State.TERMINATED);
- Verify.verify(future.set(null));
- }
-
@Override
void initialize() throws CandidateAlreadyRegisteredException {
- LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
-
- lock.lock();
+ verify(tryLock());
try {
- Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
- state);
-
- // Catch events if they fire during this call
- capture = new ArrayList<>(0);
- serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
- state = State.REGISTERED;
-
- final List<C> captured = capture;
- capture = null;
- captured.forEach(this::lockedOwnershipChanged);
+ checkState(!initialized, "Singleton group %s was already initilized", identifier);
+ LOG.debug("Initializing service group {} with services {}", identifier, members);
+ synchronized (this) {
+ serviceEntityState = EntityState.REGISTERED;
+ serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+ initialized = true;
+ }
} finally {
- lock.unlock();
+ unlock();
}
}
private void checkNotClosed() {
- Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
- identifier);
+ checkState(!isClosed(), "Service group %s has already been closed", identifier);
}
@Override
- void registerService(final ClusterSingletonService service) {
- Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ void registerService(final ClusterSingletonServiceRegistration reg) {
+ final ClusterSingletonService service = verifyRegistration(reg);
checkNotClosed();
- LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
+ checkState(initialized, "Service group %s is not initialized yet", identifier);
- lock.lock();
- try {
- Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
- serviceGroup.add(service);
+ // First put the service
+ LOG.debug("Adding service {} to service group {}", service, identifier);
+ verify(members.add(reg));
+ markDirty();
- switch (state) {
- case OWNER:
- case STARTING_SERVICES:
- service.instantiateServiceInstance();
- break;
- default:
- break;
- }
- } finally {
- lock.unlock();
+ if (!tryLock()) {
+ LOG.debug("Service group {} delayed register of {}", identifier, reg);
+ return;
}
+
+ reconcileState();
}
- @CheckReturnValue
@Override
- boolean unregisterService(final ClusterSingletonService service) {
- Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
+ verifyRegistration(reg);
checkNotClosed();
- lock.lock();
- try {
- // There is a slight problem here, as the type does not match the list type, hence we need to tread
- // carefully.
- if (serviceGroup.size() == 1) {
- Verify.verify(serviceGroup.contains(service));
- return true;
- }
+ verify(members.remove(reg));
+ markDirty();
+ if (members.isEmpty()) {
+ // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
+ // before we start applying state, because while we do not re-enter, the user is free to do whatever,
+ // notably including registering a service with the same ID from the service shutdown hook. That
+ // registration request needs to hit the successor of this group.
+ return destroyGroup();
+ }
- Verify.verify(serviceGroup.remove(service));
- LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+ if (tryLock()) {
+ reconcileState();
+ } else {
+ LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
+ }
+ return null;
+ }
- switch (state) {
- case OWNER:
- case STARTING_SERVICES:
- service.closeServiceInstance();
- break;
- default:
- break;
- }
+ private ClusterSingletonService verifyRegistration(final ClusterSingletonServiceRegistration reg) {
+ final ClusterSingletonService service = reg.getInstance();
+ verify(identifier.equals(service.getIdentifier().getName()));
+ return service;
+ }
- return false;
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
+ private synchronized @NonNull ListenableFuture<?> destroyGroup() {
+ final SettableFuture<Void> future = SettableFuture.create();
+ if (!closeFuture.compareAndSet(null, future)) {
+ return verifyNotNull(closeFuture.get());
}
+
+ if (serviceEntityReg != null) {
+ // We are still holding the service registration, close it now...
+ LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
+ serviceEntityReg.close();
+ serviceEntityReg = null;
+ }
+
+ markDirty();
+ return future;
}
@Override
void ownershipChanged(final C ownershipChange) {
LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
- lock.lock();
- try {
- if (capture != null) {
- capture.add(ownershipChange);
- } else {
- lockedOwnershipChanged(ownershipChange);
+ synchronized (this) {
+ lockedOwnershipChanged(ownershipChange);
+ }
+
+ if (isDirty()) {
+ if (!tryLock()) {
+ LOG.debug("Service group {} postponing ownership change sync", identifier);
+ return;
}
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
+
+ reconcileState();
}
}
+ /**
+ * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
+ * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
+ *
+ * @param ownershipChange reported change
+ */
+ @Holding("this")
private void lockedOwnershipChanged(final C ownershipChange) {
- if (ownershipChange.inJeopardy()) {
- LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
- lostOwnership();
- return;
- }
-
final E entity = ownershipChange.getEntity();
if (serviceEntity.equals(entity)) {
- serviceOwnershipChanged(ownershipChange);
+ serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+ markDirty();
} else if (cleanupEntity.equals(entity)) {
- cleanupCandidateOwnershipChanged(ownershipChange);
+ cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+ markDirty();
} else {
LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
}
}
- private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
- switch (ownershipChange.getState()) {
+ @Holding("this")
+ private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+ if (jeopardy) {
+ switch (state) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
+ cleanupEntityState = EntityState.OWNED_JEOPARDY;
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
+ cleanupEntityState = EntityState.UNOWNED;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+ }
+
+ return;
+ }
+
+ if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
+ // Pair info message with previous jeopardy
+ LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
+ }
+
+ switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
- switch (state) {
- case TAKING_OWNERSHIP:
- // SLAVE to MASTER
- startServices();
- return;
- default:
- break;
- }
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ cleanupEntityState = EntityState.OWNED;
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- switch (state) {
- case RELEASING_OWNERSHIP:
- // Slight cheat: if we are closing down, we just need to notify the future
- updateState(isClosed() ? State.INITIAL : State.STANDBY);
- return;
- case STARTING_SERVICES:
- case OWNER:
- case TAKING_OWNERSHIP:
- LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
- return;
- default:
- break;
- }
-
- break;
- case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- case REMOTE_OWNERSHIP_CHANGED:
case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ cleanupEntityState = EntityState.UNOWNED;
+ break;
default:
+ LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
break;
}
-
- LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
}
- private void serviceOwnershipChanged(final C ownershipChange) {
- switch (ownershipChange.getState()) {
+ @Holding("this")
+ private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
+ if (jeopardy) {
+ LOG.info("Service group {} service entity ownership uncertain", identifier);
+ switch (state) {
+ case LOCAL_OWNERSHIP_GRANTED:
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ serviceEntityState = EntityState.OWNED_JEOPARDY;
+ break;
+ case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
+ case LOCAL_OWNERSHIP_LOST_NO_OWNER:
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ serviceEntityState = EntityState.UNOWNED;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
+ }
+ return;
+ }
+
+ if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
+ // Pair info message with previous jeopardy
+ LOG.info("Service group {} service entity ownership ascertained", identifier);
+ }
+
+ switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
- // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
- takeOwnership();
+ case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
+ LOG.debug("Service group {} acquired service entity ownership", identifier);
+ serviceEntityState = EntityState.OWNED;
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
- lostOwnership();
+ case REMOTE_OWNERSHIP_CHANGED:
+ case REMOTE_OWNERSHIP_LOST_NO_OWNER:
+ LOG.debug("Service group {} lost service entity ownership", identifier);
+ serviceEntityState = EntityState.UNOWNED;
break;
default:
- // Not needed notifications
- LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
- ownershipChange);
+ LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
}
}
- private void finishCloseIfNeeded() {
- final SettableFuture<Void> future = closeFuture.get();
- if (future != null) {
- lock.lock();
+ // has to be called with lock asserted, which will be released prior to returning
+ private void reconcileState() {
+ // Always check if there is any state change to be applied.
+ while (true) {
try {
- lockedClose(future);
+ if (conditionalClean()) {
+ tryReconcileState();
+ }
} finally {
- lock.unlock();
+ // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
+ // - registration
+ // - unregistration
+ // - service future completed
+ // - entity state changed
+ //
+ // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
+ // be dirty again. This closes the following race condition:
+ //
+ // A: runs these checks holding the lock
+ // B: modifies them, fails to acquire lock
+ // A: releases lock -> noone takes care of reconciliation
+
+ unlock();
}
- }
- }
- /*
- * Help method to registered DoubleCandidateEntity. It is first step
- * before the actual instance take Leadership.
- */
- private void takeOwnership() {
- if (isClosed()) {
- LOG.debug("Service group {} is closed, not taking ownership", identifier);
- return;
- }
+ if (isDirty()) {
+ if (tryLock()) {
+ LOG.debug("Service group {} re-running reconciliation", identifier);
+ continue;
+ }
- LOG.debug("Group {} taking ownership", identifier);
+ LOG.debug("Service group {} will be reconciled by someone else", identifier);
+ } else {
+ LOG.debug("Service group {} is completely reconciled", identifier);
+ }
- updateState(State.TAKING_OWNERSHIP);
- try {
- cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
- } catch (CandidateAlreadyRegisteredException e) {
- LOG.error("Service group {} failed to take ownership", identifier, e);
+ break;
}
}
- /*
- * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void startServices() {
- if (isClosed()) {
- LOG.debug("Service group {} is closed, not starting services", identifier);
- return;
+ private void serviceTransitionCompleted() {
+ markDirty();
+ if (tryLock()) {
+ reconcileState();
}
+ }
- LOG.debug("Service group {} starting services", identifier);
- serviceGroup.forEach(service -> {
- LOG.debug("Starting service {}", service);
- try {
- service.instantiateServiceInstance();
- } catch (Exception e) {
- LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+ // Has to be called with lock asserted
+ private void tryReconcileState() {
+ // First take a safe snapshot of current state on which we will base our decisions.
+ final Set<ClusterSingletonServiceRegistration> localMembers;
+ final boolean haveCleanup;
+ final boolean haveService;
+ synchronized (this) {
+ if (serviceEntityReg != null) {
+ switch (serviceEntityState) {
+ case OWNED:
+ case OWNED_JEOPARDY:
+ haveService = true;
+ break;
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ haveService = false;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+ }
+ } else {
+ haveService = false;
}
- });
- LOG.debug("Service group {} services started", identifier);
- updateState(State.OWNER);
- }
+ if (haveService && cleanupEntityReg == null) {
+ // We have the service entity but have not registered for cleanup entity. Do that now and retry.
+ LOG.debug("Service group {} registering cleanup entity", identifier);
+ try {
+ cleanupEntityState = EntityState.REGISTERED;
+ cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
+ if (serviceEntityReg != null) {
+ serviceEntityReg.close();
+ serviceEntityReg = null;
+ }
+ }
+ markDirty();
+ return;
+ }
- /*
- * Help method calls suspendService method for stop this single cluster-wide service instance.
- * The last async. step has to close DoubleCandidateRegistration reference what should initialize
- * new election for DoubleCandidateEntity.
- */
- private void lostOwnership() {
- LOG.debug("Service group {} lost ownership in state {}", identifier, state);
- switch (state) {
- case REGISTERED:
- updateState(State.STANDBY);
- break;
- case OWNER:
- stopServices();
- break;
- case STARTING_SERVICES:
- case STOPPING_SERVICES:
- // No-op, as these will re-check state before proceeding
- break;
- case TAKING_OWNERSHIP:
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- updateState(State.STANDBY);
- break;
- case INITIAL:
- case TERMINATED:
- default:
- LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
- break;
+ if (cleanupEntityReg != null) {
+ switch (cleanupEntityState) {
+ case OWNED:
+ haveCleanup = true;
+ break;
+ case OWNED_JEOPARDY:
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ haveCleanup = false;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+ }
+ } else {
+ haveCleanup = false;
+ }
+
+ localMembers = ImmutableSet.copyOf(members);
}
- }
- @SuppressWarnings("checkstyle:IllegalCatch")
- void stopServices() {
- updateState(State.STOPPING_SERVICES);
+ if (haveService && haveCleanup) {
+ ensureServicesStarting(localMembers);
+ return;
+ }
- final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
- for (final ClusterSingletonService service : serviceGroup) {
- final ListenableFuture<Void> future;
+ ensureServicesStopping();
- try {
- future = service.closeServiceInstance();
- } catch (Exception e) {
- LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
- service, e);
- continue;
+ if (!haveService && services.isEmpty()) {
+ LOG.debug("Service group {} has no running services", identifier);
+ final boolean canFinishClose;
+ synchronized (this) {
+ if (cleanupEntityReg != null) {
+ LOG.debug("Service group {} releasing cleanup entity", identifier);
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ }
+
+ switch (cleanupEntityState) {
+ case OWNED:
+ case OWNED_JEOPARDY:
+ case REGISTERED:
+ // When we are registered we need to wait for registration to resolve, otherwise
+ // the notification could be routed to the next incarnation of this group -- which could be
+ // confused by the fact it is not registered, but receives, for example, OWNED notification.
+ canFinishClose = false;
+ break;
+ case UNOWNED:
+ case UNREGISTERED:
+ canFinishClose = true;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity state " + cleanupEntityState);
+ }
}
- serviceCloseFutureList.add(future);
+ if (canFinishClose) {
+ final SettableFuture<Void> localFuture = closeFuture.get();
+ if (localFuture != null && !localFuture.isDone()) {
+ LOG.debug("Service group {} completing termination", identifier);
+ localFuture.set(null);
+ }
+ }
}
+ }
- Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
- @Override
- public void onFailure(final Throwable cause) {
- LOG.warn("Service group {} service stopping reported error", identifier, cause);
- onServicesStopped();
+ // Has to be called with lock asserted
+ @SuppressWarnings("illegalCatch")
+ private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
+ LOG.debug("Service group {} starting services", identifier);
+
+ // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
+ // example when this method is executed as part of unregisterService() call. In that case we need to ensure
+ // services in the list are stopping
+ final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+ while (it.hasNext()) {
+ final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+ final ClusterSingletonServiceRegistration reg = entry.getKey();
+ if (!localConfig.contains(reg)) {
+ final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
+ if (newInfo != null) {
+ entry.setValue(newInfo);
+ } else {
+ it.remove();
+ }
}
+ }
+
+ // Now make sure member services are being juggled around
+ for (ClusterSingletonServiceRegistration reg : localConfig) {
+ if (!services.containsKey(reg)) {
+ final ClusterSingletonService service = reg.getInstance();
+ LOG.debug("Starting service {}", service);
+
+ try {
+ service.instantiateServiceInstance();
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
+ e);
+ continue;
+ }
- @Override
- public void onSuccess(final List<Void> nulls) {
- onServicesStopped();
+ services.put(reg, ServiceInfo.started());
}
- });
+ }
}
- void onServicesStopped() {
- LOG.debug("Service group {} finished stopping services", identifier);
- lock.lock();
- try {
- if (cleanupEntityReg != null) {
- updateState(State.RELEASING_OWNERSHIP);
- cleanupEntityReg.close();
- cleanupEntityReg = null;
+ // Has to be called with lock asserted
+ private void ensureServicesStopping() {
+ final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+ while (it.hasNext()) {
+ final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+ final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
+ if (newInfo != null) {
+ entry.setValue(newInfo);
} else {
- updateState(State.STANDBY);
+ it.remove();
}
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
}
}
+ @SuppressWarnings("illegalCatch")
+ private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
+ switch (info.getState()) {
+ case STARTED:
+ final ClusterSingletonService service = reg.getInstance();
+
+ LOG.debug("Service group {} stopping service {}", identifier, service);
+ final @NonNull ListenableFuture<?> future;
+ try {
+ future = verifyNotNull(service.closeServiceInstance());
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
+ e);
+ return null;
+ }
+
+ Futures.addCallback(future, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object result) {
+ LOG.debug("Service group {} service {} stopped successfully", identifier, service);
+ serviceTransitionCompleted();
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
+ serviceTransitionCompleted();
+ }
+ }, MoreExecutors.directExecutor());
+ return info.toState(ServiceState.STOPPING, future);
+ case STOPPING:
+ if (info.getFuture().isDone()) {
+ LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
+ return null;
+ }
+ return info;
+ default:
+ throw new IllegalStateException("Unhandled state " + info.getState());
+ }
+ }
+
+ private void markDirty() {
+ dirty = 1;
+ }
+
+ private boolean isDirty() {
+ return dirty != 0;
+ }
+
+ private boolean conditionalClean() {
+ return DIRTY_UPDATER.compareAndSet(this, 1, 0);
+ }
+
+ private boolean tryLock() {
+ return LOCK_UPDATER.compareAndSet(this, 0, 1);
+ }
+
+ private boolean unlock() {
+ verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
+ return true;
+ }
+
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
+ return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();
}
}