2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.singleton.dom.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.base.Verify.verifyNotNull;
14 import static java.util.Objects.requireNonNull;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.MoreObjects;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.SettableFuture;
25 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
30 import java.util.Map.Entry;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.checkerframework.checker.lock.qual.GuardedBy;
36 import org.checkerframework.checker.lock.qual.Holding;
37 import org.eclipse.jdt.annotation.NonNull;
38 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
39 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
40 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
41 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
42 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
43 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
44 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
45 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
46 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
47 import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
53 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
56 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
57 * a result on new candidates appearing. We use two entities:
58 * - service entity, to which all nodes register
59 * - cleanup entity, which only the service entity owner registers to
62 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
63 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
64 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
65 * granted until the old owner finishes the cleanup.
67 * @param <P> the instance identifier path type
68 * @param <E> the GenericEntity type
69 * @param <C> the GenericEntityOwnershipChange type
70 * @param <G> the GenericEntityOwnershipListener type
71 * @param <S> the GenericEntityOwnershipService type
73 final class ClusterSingletonServiceGroupImpl<P extends HierarchicalIdentifier<P>, E extends GenericEntity<P>,
74 C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
75 S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
77 private enum EntityState {
79 * This entity was never registered.
83 * Registration exists, but we are waiting for it to resolve.
87 * Registration indicated we are the owner.
91 * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
92 * another partition, for example.
96 * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
97 * do not need an UNOWNED_JEOPARDY state.
104 * Local service is up and running.
106 // FIXME: we should support async startup, which will require a STARTING state.
109 * Local service is being stopped.
114 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
116 private final S entityOwnershipService;
117 private final String identifier;
119 /* Entity instances */
120 private final E serviceEntity;
121 private final E cleanupEntity;
123 private final Set<ClusterSingletonServiceRegistration> members = ConcurrentHashMap.newKeySet();
125 private final Map<ClusterSingletonServiceRegistration, ServiceInfo> services = new HashMap<>();
127 // Marker for when any state changed
128 @SuppressWarnings("rawtypes")
129 private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> DIRTY_UPDATER =
130 AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "dirty");
131 private volatile int dirty;
133 // Simplified lock: non-reentrant, support tryLock() only
134 @SuppressWarnings("rawtypes")
135 private static final AtomicIntegerFieldUpdater<ClusterSingletonServiceGroupImpl> LOCK_UPDATER =
136 AtomicIntegerFieldUpdater.newUpdater(ClusterSingletonServiceGroupImpl.class, "lock");
137 @SuppressWarnings("unused")
138 private volatile int lock;
141 * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
142 * - user calling close()
143 * - service entity ownership
144 * - cleanup entity ownership
145 * - service shutdown future
147 * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
148 * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
149 * due to boilerplate overhead.
151 * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
152 * based on recorded bits -- without explicit representation of state machine state.
155 * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
156 * the user has closed the group and we are converging to termination.
158 // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
159 // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
160 // XXX: above needs a microbenchmark contention ever becomes a problem.
161 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
164 * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
165 * acquire {@link #cleanupEntity}.
168 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
170 * Service (base) entity last reported state.
173 private EntityState serviceEntityState = EntityState.UNREGISTERED;
176 * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
180 private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
182 * Cleanup (owner) entity last reported state.
185 private EntityState cleanupEntityState = EntityState.UNREGISTERED;
187 private volatile boolean initialized;
190 * Class constructor. Note: last argument is reused as-is.
192 * @param identifier non-empty string as identifier
193 * @param mainEntity as Entity instance
194 * @param closeEntity as Entity instance
195 * @param entityOwnershipService GenericEntityOwnershipService instance
196 * @param parent parent service
197 * @param services Services list
199 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
200 final E closeEntity, final Collection<ClusterSingletonServiceRegistration> services) {
201 checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
202 this.identifier = identifier;
203 this.entityOwnershipService = requireNonNull(entityOwnershipService);
204 this.serviceEntity = requireNonNull(mainEntity);
205 this.cleanupEntity = requireNonNull(closeEntity);
206 members.addAll(services);
208 LOG.debug("Instantiated new service group for {}", identifier);
212 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
213 final E closeEntity, final S entityOwnershipService) {
214 this(identifier, entityOwnershipService, mainEntity, closeEntity, ImmutableList.of());
218 public String getIdentifier() {
223 ListenableFuture<?> closeClusterSingletonGroup() {
224 final ListenableFuture<?> ret = destroyGroup();
231 LOG.debug("Service group {} postponing sync on close", identifier);
237 private boolean isClosed() {
238 return closeFuture.get() != null;
242 void initialize() throws CandidateAlreadyRegisteredException {
245 checkState(!initialized, "Singleton group %s was already initilized", identifier);
246 LOG.debug("Initializing service group {} with services {}", identifier, members);
247 synchronized (this) {
248 serviceEntityState = EntityState.REGISTERED;
249 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
257 private void checkNotClosed() {
258 checkState(!isClosed(), "Service group %s has already been closed", identifier);
262 void registerService(final ClusterSingletonServiceRegistration reg) {
263 final ClusterSingletonService service = verifyRegistration(reg);
266 checkState(initialized, "Service group %s is not initialized yet", identifier);
268 // First put the service
269 LOG.debug("Adding service {} to service group {}", service, identifier);
270 verify(members.add(reg));
274 LOG.debug("Service group {} delayed register of {}", identifier, reg);
282 ListenableFuture<?> unregisterService(final ClusterSingletonServiceRegistration reg) {
283 verifyRegistration(reg);
286 verify(members.remove(reg));
288 if (members.isEmpty()) {
289 // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
290 // before we start applying state, because while we do not re-enter, the user is free to do whatever,
291 // notably including registering a service with the same ID from the service shutdown hook. That
292 // registration request needs to hit the successor of this group.
293 return destroyGroup();
299 LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
304 private ClusterSingletonService verifyRegistration(final ClusterSingletonServiceRegistration reg) {
305 final ClusterSingletonService service = reg.getInstance();
306 verify(identifier.equals(service.getIdentifier().getName()));
310 private synchronized @NonNull ListenableFuture<?> destroyGroup() {
311 final SettableFuture<Void> future = SettableFuture.create();
312 if (!closeFuture.compareAndSet(null, future)) {
313 return verifyNotNull(closeFuture.get());
316 if (serviceEntityReg != null) {
317 // We are still holding the service registration, close it now...
318 LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
319 serviceEntityReg.close();
320 serviceEntityReg = null;
328 void ownershipChanged(final C ownershipChange) {
329 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
331 synchronized (this) {
332 lockedOwnershipChanged(ownershipChange);
337 LOG.debug("Service group {} postponing ownership change sync", identifier);
346 * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
347 * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
349 * @param ownershipChange reported change
352 private void lockedOwnershipChanged(final C ownershipChange) {
353 final E entity = ownershipChange.getEntity();
354 if (serviceEntity.equals(entity)) {
355 serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
357 } else if (cleanupEntity.equals(entity)) {
358 cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
361 LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
366 private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
369 case LOCAL_OWNERSHIP_GRANTED:
370 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
371 LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
372 cleanupEntityState = EntityState.OWNED_JEOPARDY;
374 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
375 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
376 case REMOTE_OWNERSHIP_CHANGED:
377 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
378 LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
379 cleanupEntityState = EntityState.UNOWNED;
382 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
388 if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
389 // Pair info message with previous jeopardy
390 LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
394 case LOCAL_OWNERSHIP_GRANTED:
395 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
396 cleanupEntityState = EntityState.OWNED;
398 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
399 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
400 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
401 case REMOTE_OWNERSHIP_CHANGED:
402 cleanupEntityState = EntityState.UNOWNED;
405 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
411 private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
413 LOG.info("Service group {} service entity ownership uncertain", identifier);
415 case LOCAL_OWNERSHIP_GRANTED:
416 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
417 serviceEntityState = EntityState.OWNED_JEOPARDY;
419 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
420 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
421 case REMOTE_OWNERSHIP_CHANGED:
422 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
423 serviceEntityState = EntityState.UNOWNED;
426 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
431 if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
432 // Pair info message with previous jeopardy
433 LOG.info("Service group {} service entity ownership ascertained", identifier);
437 case LOCAL_OWNERSHIP_GRANTED:
438 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
439 LOG.debug("Service group {} acquired service entity ownership", identifier);
440 serviceEntityState = EntityState.OWNED;
442 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
443 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
444 case REMOTE_OWNERSHIP_CHANGED:
445 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
446 LOG.debug("Service group {} lost service entity ownership", identifier);
447 serviceEntityState = EntityState.UNOWNED;
450 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
454 // has to be called with lock asserted, which will be released prior to returning
455 private void reconcileState() {
456 // Always check if there is any state change to be applied.
459 if (conditionalClean()) {
463 // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
466 // - service future completed
467 // - entity state changed
469 // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
470 // be dirty again. This closes the following race condition:
472 // A: runs these checks holding the lock
473 // B: modifies them, fails to acquire lock
474 // A: releases lock -> noone takes care of reconciliation
481 LOG.debug("Service group {} re-running reconciliation", identifier);
485 LOG.debug("Service group {} will be reconciled by someone else", identifier);
487 LOG.debug("Service group {} is completely reconciled", identifier);
494 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
495 justification = "https://github.com/spotbugs/spotbugs/issues/811")
496 private void serviceTransitionCompleted() {
503 // Has to be called with lock asserted
504 private void tryReconcileState() {
505 // First take a safe snapshot of current state on which we will base our decisions.
506 final Set<ClusterSingletonServiceRegistration> localMembers;
507 final boolean haveCleanup;
508 final boolean haveService;
509 synchronized (this) {
510 if (serviceEntityReg != null) {
511 switch (serviceEntityState) {
522 throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
528 if (haveService && cleanupEntityReg == null) {
529 // We have the service entity but have not registered for cleanup entity. Do that now and retry.
530 LOG.debug("Service group {} registering cleanup entity", identifier);
532 cleanupEntityState = EntityState.REGISTERED;
533 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
534 } catch (CandidateAlreadyRegisteredException e) {
535 LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
536 if (serviceEntityReg != null) {
537 serviceEntityReg.close();
538 serviceEntityReg = null;
545 if (cleanupEntityReg != null) {
546 switch (cleanupEntityState) {
557 throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
563 localMembers = ImmutableSet.copyOf(members);
566 if (haveService && haveCleanup) {
567 ensureServicesStarting(localMembers);
571 ensureServicesStopping();
573 if (!haveService && services.isEmpty()) {
574 LOG.debug("Service group {} has no running services", identifier);
575 final boolean canFinishClose;
576 synchronized (this) {
577 if (cleanupEntityReg != null) {
578 LOG.debug("Service group {} releasing cleanup entity", identifier);
579 cleanupEntityReg.close();
580 cleanupEntityReg = null;
583 switch (cleanupEntityState) {
587 // When we are registered we need to wait for registration to resolve, otherwise
588 // the notification could be routed to the next incarnation of this group -- which could be
589 // confused by the fact it is not registered, but receives, for example, OWNED notification.
590 canFinishClose = false;
594 canFinishClose = true;
597 throw new IllegalStateException("Unhandled cleanup entity state " + cleanupEntityState);
601 if (canFinishClose) {
602 final SettableFuture<Void> localFuture = closeFuture.get();
603 if (localFuture != null && !localFuture.isDone()) {
604 LOG.debug("Service group {} completing termination", identifier);
605 localFuture.set(null);
611 // Has to be called with lock asserted
612 @SuppressWarnings("illegalCatch")
613 private void ensureServicesStarting(final Set<ClusterSingletonServiceRegistration> localConfig) {
614 LOG.debug("Service group {} starting services", identifier);
616 // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
617 // example when this method is executed as part of unregisterService() call. In that case we need to ensure
618 // services in the list are stopping
619 final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
620 while (it.hasNext()) {
621 final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
622 final ClusterSingletonServiceRegistration reg = entry.getKey();
623 if (!localConfig.contains(reg)) {
624 final ServiceInfo newInfo = ensureStopping(reg, entry.getValue());
625 if (newInfo != null) {
626 entry.setValue(newInfo);
633 // Now make sure member services are being juggled around
634 for (ClusterSingletonServiceRegistration reg : localConfig) {
635 if (!services.containsKey(reg)) {
636 final ClusterSingletonService service = reg.getInstance();
637 LOG.debug("Starting service {}", service);
640 service.instantiateServiceInstance();
641 } catch (Exception e) {
642 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
647 services.put(reg, ServiceInfo.started());
652 // Has to be called with lock asserted
653 private void ensureServicesStopping() {
654 final Iterator<Entry<ClusterSingletonServiceRegistration, ServiceInfo>> it = services.entrySet().iterator();
655 while (it.hasNext()) {
656 final Entry<ClusterSingletonServiceRegistration, ServiceInfo> entry = it.next();
657 final ServiceInfo newInfo = ensureStopping(entry.getKey(), entry.getValue());
658 if (newInfo != null) {
659 entry.setValue(newInfo);
666 @SuppressWarnings("illegalCatch")
667 private ServiceInfo ensureStopping(final ClusterSingletonServiceRegistration reg, final ServiceInfo info) {
668 switch (info.getState()) {
670 final ClusterSingletonService service = reg.getInstance();
672 LOG.debug("Service group {} stopping service {}", identifier, service);
673 final @NonNull ListenableFuture<?> future;
675 future = verifyNotNull(service.closeServiceInstance());
676 } catch (Exception e) {
677 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
682 Futures.addCallback(future, new FutureCallback<Object>() {
684 public void onSuccess(final Object result) {
685 LOG.debug("Service group {} service {} stopped successfully", identifier, service);
686 serviceTransitionCompleted();
690 public void onFailure(final Throwable cause) {
691 LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
692 serviceTransitionCompleted();
694 }, MoreExecutors.directExecutor());
695 return info.toState(ServiceState.STOPPING, future);
697 if (info.getFuture().isDone()) {
698 LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
703 throw new IllegalStateException("Unhandled state " + info.getState());
707 private void markDirty() {
711 private boolean isDirty() {
715 private boolean conditionalClean() {
716 return DIRTY_UPDATER.compareAndSet(this, 1, 0);
719 private boolean tryLock() {
720 return LOCK_UPDATER.compareAndSet(this, 0, 1);
723 private boolean unlock() {
724 verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
729 public String toString() {
730 return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();