* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.mdsal.singleton.dom.impl;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
UNOWNED,
}
- private enum ServiceState {
- /**
- * Local services are stopped.
- */
- STOPPED,
+ enum ServiceState {
/**
- * Local services are up and running.
+ * Local service is up and running.
*/
// FIXME: we should support async startup, which will require a STARTING state.
STARTED,
/**
- * Local services are being stopped.
+ * Local service is being stopped.
*/
STOPPING,
}
private final E serviceEntity;
private final E cleanupEntity;
- private final ReentrantLock lock = new ReentrantLock(true);
+ private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
+ // Guarded by lock
+ private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
- @GuardedBy("lock")
- private final List<ClusterSingletonServiceRegistration> serviceGroup;
+ // Marker for when any state changed
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
+ private volatile int dirty;
+
+ // Simplified lock: non-reentrant, support tryLock() only
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
+ @SuppressWarnings("unused")
+ private volatile int lock;
/*
* State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
* Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
* acquire {@link #cleanupEntity}.
*/
- @GuardedBy("lock")
+ @GuardedBy("this")
private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
/**
* Service (base) entity last reported state.
*/
- @GuardedBy("lock")
+ @GuardedBy("this")
private EntityState serviceEntityState = EntityState.UNREGISTERED;
/**
* Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
* and startup.
*/
- @GuardedBy("lock")
+ @GuardedBy("this")
private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
/**
* Cleanup (owner) entity last reported state.
*/
- @GuardedBy("lock")
+ @GuardedBy("this")
private EntityState cleanupEntityState = EntityState.UNREGISTERED;
- /**
- * Optional event capture list. This field is initialized when we interact with entity ownership service, to capture
- * events reported during EOS method invocation -- like immediate acquisition of entity when we register it. This
- * prevents bugs from recursion.
- */
- @GuardedBy("lock")
- private List<C> capture = null;
-
- /**
- * State of local services.
- */
- @GuardedBy("lock")
- private ServiceState localServicesState = ServiceState.STOPPED;
+ private volatile boolean initialized;
/**
* Class constructor. Note: last argument is reused as-is.
* @param services Services list
*/
ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
- final E closeEntity, final List<ClusterSingletonServiceRegistration> services) {
- Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
+ final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
+ checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
this.identifier = identifier;
- this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
- this.serviceEntity = Preconditions.checkNotNull(mainEntity);
- this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
- this.serviceGroup = Preconditions.checkNotNull(services);
+ this.entityOwnershipService = requireNonNull(entityOwnershipService);
+ this.serviceEntity = requireNonNull(mainEntity);
+ this.cleanupEntity = requireNonNull(closeEntity);
+ members.addAll(services);
+
LOG.debug("Instantiated new service group for {}", identifier);
}
@VisibleForTesting
ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
final E closeEntity, final S entityOwnershipService) {
- this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
+ this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
}
@Override
@Override
ListenableFuture<?> closeClusterSingletonGroup() {
- // Assert our future first
- final SettableFuture<Void> future = SettableFuture.create();
- final SettableFuture<Void> existing = closeFuture.getAndSet(future);
- if (existing != null) {
- return existing;
- }
+ final ListenableFuture<?> ret = destroyGroup();
+ members.clear();
+ markDirty();
- if (!lock.tryLock()) {
- // The lock is held, the cleanup will be finished by the owner thread
- LOG.debug("Singleton group {} cleanup postponed", identifier);
- return future;
- }
-
- try {
- lockedClose(future);
- } finally {
- lock.unlock();
+ if (tryLock()) {
+ reconcileState();
+ } else {
+ LOG.debug("Service group {} postponing sync on close", identifier);
}
- LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
- return future;
+ return ret;
}
private boolean isClosed() {
return closeFuture.get() != null;
}
- @GuardedBy("lock")
- private void startCapture() {
- Verify.verify(capture == null, "Service group {} is already capturing events {}", identifier, capture);
- capture = new ArrayList<>(0);
- LOG.debug("Service group {} started capturing events", identifier);
- }
-
- private List<C> endCapture() {
- final List<C> ret = Verify.verifyNotNull(capture, "Service group {} is not currently capturing", identifier);
- capture = null;
- LOG.debug("Service group {} finished capturing events, {} events to process", identifier, ret.size());
- return ret;
- }
-
- @GuardedBy("lock")
- private void lockedClose(final SettableFuture<Void> future) {
- if (serviceEntityReg != null) {
- // We are still holding the service registration, close it now...
- LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
- startCapture();
- serviceEntityReg.close();
- serviceEntityReg = null;
-
- // This can potentially mutate our state, so all previous checks need to be re-validated.
- endCapture().forEach(this::lockedOwnershipChanged);
- }
-
- // Now check service entity state: if it is still owned, we need to wait until it is acknowledged as
- // unregistered.
- switch (serviceEntityState) {
- case REGISTERED:
- case UNOWNED:
- case UNREGISTERED:
- // We have either successfully shut down, or have never started up, proceed with termination
- break;
- case OWNED:
- // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
- // when that loss is reported.
- LOG.debug("Service group {} is still owned, postponing termination", identifier);
- return;
- case OWNED_JEOPARDY:
- // This is a significant event, as it relates to cluster split/join operations, operators need to know
- // we are waiting for cluster join event.
- LOG.info("Service group {} is still owned with split cluster, postponing termination", identifier);
- return;
- default:
- throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
- }
-
- // We do not own service entity state: we need to ensure services are stopped.
- if (stopServices()) {
- LOG.debug("Service group {} started shutting down services, postponing termination", identifier);
- return;
- }
-
- // Local cleanup completed, release cleanup entity if needed
- if (cleanupEntityReg != null) {
- LOG.debug("Service group {} unregistering cleanup entity {}", identifier, cleanupEntity);
- startCapture();
- cleanupEntityReg.close();
- cleanupEntityReg = null;
-
- // This can potentially mutate our state, so all previous checks need to be re-validated.
- endCapture().forEach(this::lockedOwnershipChanged);
- }
-
- switch (cleanupEntityState) {
- case REGISTERED:
- case UNOWNED:
- case UNREGISTERED:
- // We have either successfully shut down, or have never started up, proceed with termination
- break;
- case OWNED:
- // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
- // when that loss is reported.
- LOG.debug("Service group {} is still owns cleanup, postponing termination", identifier);
- return;
- case OWNED_JEOPARDY:
- // This is a significant event, as it relates to cluster split/join operations, operators need to know
- // we are waiting for cluster join event.
- LOG.info("Service group {} is still owns cleanup with split cluster, postponing termination",
- identifier);
- return;
- default:
- throw new IllegalStateException("Unhandled cleanup entity state " + serviceEntityState);
- }
-
- // No registrations left and no service operations pending, we are done
- LOG.debug("Service group {} completing termination", identifier);
- future.set(null);
- }
-
@Override
void initialize() throws CandidateAlreadyRegisteredException {
- lock.lock();
+ verify(tryLock());
try {
- Preconditions.checkState(serviceEntityState == EntityState.UNREGISTERED,
- "Singleton group %s was already initilized", identifier);
-
- LOG.debug("Initializing service group {} with services {}", identifier, serviceGroup);
- startCapture();
- serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
- serviceEntityState = EntityState.REGISTERED;
- endCapture().forEach(this::lockedOwnershipChanged);
+ checkState(!initialized, "Singleton group %s was already initilized", identifier);
+ LOG.debug("Initializing service group {} with services {}", identifier, members);
+ synchronized (this) {
+ serviceEntityState = EntityState.REGISTERED;
+ serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
+ initialized = true;
+ }
} finally {
- lock.unlock();
+ unlock();
}
}
private void checkNotClosed() {
- Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
- identifier);
+ checkState(!isClosed(), "Service group %s has already been closed", identifier);
}
@Override
void registerService(final ClusterSingletonServiceRegistration reg) {
final ClusterSingletonService service = reg.getInstance();
- Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ verify(identifier.equals(service.getIdentifier().getValue()));
checkNotClosed();
- lock.lock();
- try {
- Preconditions.checkState(serviceEntityState != EntityState.UNREGISTERED,
- "Service group %s is not initialized yet", identifier);
+ checkState(initialized, "Service group %s is not initialized yet", identifier);
- LOG.debug("Adding service {} to service group {}", service, identifier);
- serviceGroup.add(reg);
+ // First put the service
+ LOG.debug("Adding service {} to service group {}", service, identifier);
+ verify(members.add(reg));
+ markDirty();
- switch (localServicesState) {
- case STARTED:
- LOG.debug("Service group {} starting late-registered service {}", identifier, service);
- service.instantiateServiceInstance();
- break;
- case STOPPED:
- case STOPPING:
- break;
- default:
- throw new IllegalStateException("Unhandled local services state " + localServicesState);
- }
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
+ if (!tryLock()) {
+ LOG.debug("Service group {} delayed register of {}", identifier, reg);
+ return;
}
+
+ reconcileState();
}
@CheckReturnValue
@Override
- boolean unregisterService(final ClusterSingletonServiceRegistration reg) {
+ ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
final ClusterSingletonService service = reg.getInstance();
- Verify.verify(identifier.equals(service.getIdentifier().getValue()));
+ verify(identifier.equals(service.getIdentifier().getValue()));
checkNotClosed();
- lock.lock();
- try {
- // There is a slight problem here, as the type does not match the list type, hence we need to tread
- // carefully.
- if (serviceGroup.size() == 1) {
- Verify.verify(serviceGroup.contains(reg));
- return true;
- }
+ verify(members.remove(reg));
+ markDirty();
+ if (members.isEmpty()) {
+ // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
+ // before we start applying state, because while we do not re-enter, the user is free to do whatever,
+ // notably including registering a service with the same ID from the service shutdown hook. That
+ // registration request needs to hit the successor of this group.
+ return destroyGroup();
+ }
- Verify.verify(serviceGroup.remove(reg));
- LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+ if (tryLock()) {
+ reconcileState();
+ } else {
+ LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
+ }
+ return null;
+ }
- switch (localServicesState) {
- case STARTED:
- LOG.warn("Service group {} stopping unregistered service {}", identifier, service);
- service.closeServiceInstance();
- break;
- case STOPPED:
- case STOPPING:
- break;
- default:
- throw new IllegalStateException("Unhandled local services state " + localServicesState);
- }
+ private synchronized @NonNull ListenableFuture<?> destroyGroup() {
+ final SettableFuture<Void> future = SettableFuture.create();
+ if (!closeFuture.compareAndSet(null, future)) {
+ return verifyNotNull(closeFuture.get());
+ }
- return false;
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
+ if (serviceEntityReg != null) {
+ // We are still holding the service registration, close it now...
+ LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
+ serviceEntityReg.close();
+ serviceEntityReg = null;
}
+
+ markDirty();
+ return future;
}
@Override
void ownershipChanged(final C ownershipChange) {
LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
- lock.lock();
- try {
- if (capture != null) {
- capture.add(ownershipChange);
- } else {
- lockedOwnershipChanged(ownershipChange);
+ synchronized (this) {
+ lockedOwnershipChanged(ownershipChange);
+ }
+
+ if (isDirty()) {
+ if (!tryLock()) {
+ LOG.debug("Service group {} postponing ownership change sync");
+ return;
}
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
+
+ reconcileState();
}
}
*
* @param ownershipChange reported change
*/
- @GuardedBy("lock")
+ @GuardedBy("this")
private void lockedOwnershipChanged(final C ownershipChange) {
final E entity = ownershipChange.getEntity();
if (serviceEntity.equals(entity)) {
serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+ markDirty();
} else if (cleanupEntity.equals(entity)) {
cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
+ markDirty();
} else {
LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
}
}
+ @GuardedBy("this")
private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
if (jeopardy) {
switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- if (cleanupEntityReg == null) {
- LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
- return;
- }
-
LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
cleanupEntityState = EntityState.OWNED_JEOPARDY;
break;
throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
}
- stopServices();
return;
}
switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- if (cleanupEntityReg == null) {
- LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
- return;
- }
-
cleanupEntityState = EntityState.OWNED;
- switch (localServicesState) {
- case STARTED:
- LOG.debug("Service group {} already has local services running", identifier);
- break;
- case STOPPED:
- startServices();
- break;
- case STOPPING:
- LOG.debug("Service group {} has local services stopping, postponing startup", identifier);
- break;
- default:
- throw new IllegalStateException("Unhandled local services state " + localServicesState);
- }
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- cleanupEntityState = EntityState.UNOWNED;
- stopServices();
- break;
case REMOTE_OWNERSHIP_LOST_NO_OWNER:
case REMOTE_OWNERSHIP_CHANGED:
cleanupEntityState = EntityState.UNOWNED;
}
}
+ @GuardedBy("this")
private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
if (jeopardy) {
LOG.info("Service group {} service entity ownership uncertain", identifier);
-
- // Service entity ownership is uncertain, which means we want to record the state, but we do not want
- // to stop local services nor do anything with the cleanup entity.
switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- if (serviceEntityReg == null) {
- LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
- return;
- }
-
serviceEntityState = EntityState.OWNED_JEOPARDY;
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
switch (state) {
case LOCAL_OWNERSHIP_GRANTED:
case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
- if (serviceEntityReg == null) {
- LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
- return;
- }
-
+ LOG.debug("Service group {} acquired service entity ownership", identifier);
serviceEntityState = EntityState.OWNED;
- takeOwnership();
break;
case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
case LOCAL_OWNERSHIP_LOST_NO_OWNER:
- LOG.debug("Service group {} lost service entity ownership", identifier);
- serviceEntityState = EntityState.UNOWNED;
- if (stopServices()) {
- LOG.debug("Service group {} already stopping services, postponing cleanup", identifier);
- return;
- }
-
- if (cleanupEntityReg != null) {
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- }
- break;
case REMOTE_OWNERSHIP_CHANGED:
case REMOTE_OWNERSHIP_LOST_NO_OWNER:
- // No need to react, just update the state
+ LOG.debug("Service group {} lost service entity ownership", identifier);
serviceEntityState = EntityState.UNOWNED;
break;
default:
LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
- break;
}
}
- private void finishCloseIfNeeded() {
- final SettableFuture<Void> future = closeFuture.get();
- if (future != null) {
- lock.lock();
+ // has to be called with lock asserted, which will be released prior to returning
+ private void reconcileState() {
+ // Always check if there is any state change to be applied.
+ while (true) {
try {
- lockedClose(future);
+ if (conditionalClean()) {
+ tryReconcileState();
+ }
} finally {
- lock.unlock();
+ // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
+ // - registration
+ // - unregistration
+ // - service future completed
+ // - entity state changed
+ //
+ // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
+ // be dirty again. This closes the following race condition:
+ //
+ // A: runs these checks holding the lock
+ // B: modifies them, fails to acquire lock
+ // A: releases lock -> noone takes care of reconciliation
+
+ unlock();
}
+
+ if (isDirty()) {
+ if (tryLock()) {
+ LOG.debug("Service group {} re-running reconciliation", identifier);
+ continue;
+ }
+
+ LOG.debug("Service group {} will be reconciled by someone else", identifier);
+ } else {
+ LOG.debug("Service group {} is completely reconciled", identifier);
+ }
+
+ break;
}
}
- /*
- * Help method to registered DoubleCandidateEntity. It is first step
- * before the actual instance take Leadership.
- */
- private void takeOwnership() {
- if (isClosed()) {
- LOG.debug("Service group {} is closed, skipping cleanup ownership bid", identifier);
- return;
+ private void serviceTransitionCompleted() {
+ markDirty();
+ if (tryLock()) {
+ reconcileState();
}
+ }
+
+ // Has to be called with lock asserted
+ private void tryReconcileState() {
+ // First take a safe snapshot of current state on which we will base our decisions.
+ final Set<ClusterSingletonServiceRegistration> localMembers;
+ final boolean haveCleanup;
+ final boolean haveService;
+ synchronized (this) {
+ if (serviceEntityReg != null) {
+ switch (serviceEntityState) {
+ case OWNED:
+ case OWNED_JEOPARDY:
+ haveService = true;
+ break;
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ haveService = false;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+ }
+ } else {
+ haveService = false;
+ }
- LOG.debug("Service group {} registering cleanup entity", identifier);
+ if (haveService && cleanupEntityReg == null) {
+ // We have the service entity but have not registered for cleanup entity. Do that now and retry.
+ LOG.debug("Service group {} registering cleanup entity", identifier);
+ try {
+ cleanupEntityState = EntityState.REGISTERED;
+ cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
+ if (serviceEntityReg != null) {
+ serviceEntityReg.close();
+ serviceEntityReg = null;
+ }
+ }
+ markDirty();
+ return;
+ }
- startCapture();
- try {
- cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
- cleanupEntityState = EntityState.REGISTERED;
- } catch (CandidateAlreadyRegisteredException e) {
- LOG.error("Service group {} failed to take ownership", identifier, e);
- }
+ if (cleanupEntityReg != null) {
+ switch (cleanupEntityState) {
+ case OWNED:
+ haveCleanup = true;
+ break;
+ case OWNED_JEOPARDY:
+ case REGISTERED:
+ case UNOWNED:
+ case UNREGISTERED:
+ haveCleanup = false;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
+ }
+ } else {
+ haveCleanup = false;
+ }
- endCapture().forEach(this::lockedOwnershipChanged);
- }
+ localMembers = ImmutableSet.copyOf(members);
+ }
- /*
- * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void startServices() {
- if (isClosed()) {
- LOG.debug("Service group {} is closed, not starting services", identifier);
+ if (haveService && haveCleanup) {
+ ensureServicesStarting(localMembers);
return;
}
- LOG.debug("Service group {} starting services", identifier);
- serviceGroup.forEach(reg -> {
- final ClusterSingletonService service = reg.getInstance();
- LOG.debug("Starting service {}", service);
- try {
- service.instantiateServiceInstance();
- } catch (Exception e) {
- LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
+ ensureServicesStopping();
+
+ if (!haveService && services.isEmpty()) {
+ LOG.debug("Service group {} has no running services", identifier);
+ final boolean canFinishClose;
+ synchronized (this) {
+ if (cleanupEntityReg != null) {
+ LOG.debug("Service group {} releasing cleanup entity", identifier);
+ cleanupEntityReg.close();
+ cleanupEntityReg = null;
+ }
+
+ switch (cleanupEntityState) {
+ case OWNED:
+ case OWNED_JEOPARDY:
+ case REGISTERED:
+ // When we are registered we need to wait for registration to resolve, otherwise
+ // the notification could be routed to the next incarnation of this group -- which could be
+ // confused by the fact it is not registered, but receives, for example, OWNED notification.
+ canFinishClose = false;
+ break;
+ case UNOWNED:
+ case UNREGISTERED:
+ canFinishClose = true;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled cleanup entity state " + cleanupEntityState);
+ }
}
- });
- localServicesState = ServiceState.STARTED;
- LOG.debug("Service group {} services started", identifier);
+ if (canFinishClose) {
+ final SettableFuture<Void> localFuture = closeFuture.get();
+ if (localFuture != null && !localFuture.isDone()) {
+ LOG.debug("Service group {} completing termination", identifier);
+ localFuture.set(null);
+ }
+ }
+ }
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- boolean stopServices() {
- switch (localServicesState) {
- case STARTED:
- localServicesState = ServiceState.STOPPING;
-
- final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
- for (final ClusterSingletonServiceRegistration reg : serviceGroup) {
- final ClusterSingletonService service = reg.getInstance();
- final ListenableFuture<Void> future;
- try {
- future = service.closeServiceInstance();
- } catch (Exception e) {
- LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
- service, e);
- continue;
- }
+ // Has to be called with lock asserted
+ @SuppressWarnings("illegalCatch")
+ private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
+ LOG.debug("Service group {} starting services", identifier);
+
+ // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
+ // example when this method is executed as part of unregisterService() call. In that case we need to ensure
+ // services in the list are stopping
+ final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+ while (it.hasNext()) {
+ final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+ final ClusterSingletonServiceRegistration reg = entry.getKey();
+ if (!localConfig.contains(reg)) {
+ final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
+ if (newInfo != null) {
+ entry.setValue(newInfo);
+ } else {
+ it.remove();
+ }
+ }
+ }
+
+ // Now make sure member services are being juggled around
+ for (ClusterSingletonServiceRegistration reg : localConfig) {
+ if (!services.containsKey(reg)) {
+ final ClusterSingletonService service = reg.getInstance();
+ LOG.debug("Starting service {}", service);
- serviceCloseFutureList.add(future);
+ try {
+ service.instantiateServiceInstance();
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
+ e);
+ continue;
}
- LOG.debug("Service group {} initiated service shutdown", identifier);
+ services.put(reg, ServiceInfo.started());
+ }
+ }
+ }
+
+ // Has to be called with lock asserted
+ private void ensureServicesStopping() {
+ final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
+ while (it.hasNext()) {
+ final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
+ final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
+ if (newInfo != null) {
+ entry.setValue(newInfo);
+ } else {
+ it.remove();
+ }
+ }
+ }
+
+ @SuppressWarnings("illegalCatch")
+ private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
+ switch (info.getState()) {
+ case STARTED:
+ final ClusterSingletonService service = reg.getInstance();
+
+ LOG.debug("Service group {} stopping service {}", identifier, service);
+ final @NonNull ListenableFuture<?> future;
+ try {
+ future = verifyNotNull(service.closeServiceInstance());
+ } catch (Exception e) {
+ LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
+ e);
+ return null;
+ }
- Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
+ Futures.addCallback(future, new FutureCallback<Object>() {
@Override
- public void onFailure(final Throwable cause) {
- LOG.warn("Service group {} service stopping reported error", identifier, cause);
- onServicesStopped();
+ public void onSuccess(final Object result) {
+ LOG.debug("Service group {} service {} stopped successfully", identifier, service);
+ serviceTransitionCompleted();
}
@Override
- public void onSuccess(final List<Void> nulls) {
- onServicesStopped();
+ public void onFailure(final Throwable cause) {
+ LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
+ serviceTransitionCompleted();
}
}, MoreExecutors.directExecutor());
-
- return localServicesState == ServiceState.STOPPING;
- case STOPPED:
- LOG.debug("Service group {} has already stopped services", identifier);
- return false;
+ return info.toState(ServiceState.STOPPING, future);
case STOPPING:
- LOG.debug("Service group {} is already stopping services", identifier);
- return true;
+ if (info.getFuture().isDone()) {
+ LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
+ return null;
+ }
+ return info;
default:
- throw new IllegalStateException("Unhandled local services state " + localServicesState);
+ throw new IllegalStateException("Unhandled state " + info.getState());
}
}
- void onServicesStopped() {
- LOG.debug("Service group {} finished stopping services", identifier);
- lock.lock();
- try {
- localServicesState = ServiceState.STOPPED;
+ private void markDirty() {
+ dirty = 1;
+ }
- if (isClosed()) {
- LOG.debug("Service group {} closed, skipping service restart check", identifier);
- return;
- }
+ private boolean isDirty() {
+ return dirty != 0;
+ }
- // If we lost the service entity while services were stopping, we need to unregister cleanup entity
- switch (serviceEntityState) {
- case OWNED:
- case OWNED_JEOPARDY:
- // No need to churn cleanup entity
- break;
- case REGISTERED:
- case UNOWNED:
- case UNREGISTERED:
- if (cleanupEntityReg != null) {
- startCapture();
- cleanupEntityReg.close();
- cleanupEntityReg = null;
- endCapture().forEach(this::lockedOwnershipChanged);
- }
- break;
- default:
- throw new IllegalStateException("Unhandled service entity state" + serviceEntityState);
- }
+ private boolean conditionalClean() {
+ return DIRTY_UPDATER.compareAndSet(this, 1, 0);
+ }
- if (cleanupEntityReg == null) {
- LOG.debug("Service group {} does not have cleanup entity registered, skipping restart check",
- identifier);
- return;
- }
+ private boolean tryLock() {
+ return LOCK_UPDATER.compareAndSet(this, 0, 1);
+ }
- // Double-check if the services should really be down
- switch (cleanupEntityState) {
- case OWNED:
- // We have finished stopping services, but we own cleanup, e.g. we should start them again.
- startServices();
- return;
- case UNOWNED:
- case OWNED_JEOPARDY:
- case REGISTERED:
- case UNREGISTERED:
- break;
- default:
- throw new IllegalStateException("Unhandled cleanup entity state" + cleanupEntityState);
- }
- } finally {
- lock.unlock();
- finishCloseIfNeeded();
- }
+ private boolean unlock() {
+ verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
+ return true;
}
@Override