2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.CheckReturnValue;
25 import javax.annotation.concurrent.GuardedBy;
26 import javax.annotation.concurrent.ThreadSafe;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.yangtools.concepts.Path;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
42 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
45 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
46 * a result on new candidates appearing. We use two entities:
47 * - service entity, to which all nodes register
48 * - cleanup entity, which only the service entity owner registers to
51 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
52 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
53 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
54 * granted until the old owner finishes the cleanup.
56 * @param <P> the instance identifier path type
57 * @param <E> the GenericEntity type
58 * @param <C> the GenericEntityOwnershipChange type
59 * @param <G> the GenericEntityOwnershipListener type
60 * @param <S> the GenericEntityOwnershipService type
63 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
64 C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
65 S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
67 private enum EntityState {
69 * This entity was never registered.
73 * Registration exists, but we are waiting for it to resolve.
77 * Registration indicated we are the owner.
81 * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
82 * another partition, for example.
86 * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
87 * do not need an UNOWNED_JEOPARDY state.
92 private enum ServiceState {
94 * Local services are stopped.
98 * Local services are up and running.
100 // FIXME: we should support async startup, which will require a STARTING state.
103 * Local services are being stopped.
108 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
110 private final S entityOwnershipService;
111 private final String identifier;
113 /* Entity instances */
114 private final E serviceEntity;
115 private final E cleanupEntity;
117 private final ReentrantLock lock = new ReentrantLock(true);
120 private final List<ClusterSingletonServiceRegistration> serviceGroup;
123 * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
124 * - user calling close()
125 * - service entity ownership
126 * - cleanup entity ownership
127 * - service shutdown future
129 * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
130 * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
131 * due to boilerplate overhead.
133 * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
134 * based on recorded bits -- without explicit representation of state machine state.
137 * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
138 * the user has closed the group and we are converging to termination.
140 // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
141 // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
142 // XXX: above needs a microbenchmark contention ever becomes a problem.
143 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
146 * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
147 * acquire {@link #cleanupEntity}.
150 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
152 * Service (base) entity last reported state.
155 private EntityState serviceEntityState = EntityState.UNREGISTERED;
158 * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
162 private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
164 * Cleanup (owner) entity last reported state.
167 private EntityState cleanupEntityState = EntityState.UNREGISTERED;
170 * Optional event capture list. This field is initialized when we interact with entity ownership service, to capture
171 * events reported during EOS method invocation -- like immediate acquisition of entity when we register it. This
172 * prevents bugs from recursion.
175 private List<C> capture = null;
178 * State of local services.
181 private ServiceState localServicesState = ServiceState.STOPPED;
184 * Class constructor. Note: last argument is reused as-is.
186 * @param identifier non-empty string as identifier
187 * @param mainEntity as Entity instance
188 * @param closeEntity as Entity instance
189 * @param entityOwnershipService GenericEntityOwnershipService instance
190 * @param parent parent service
191 * @param services Services list
193 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
194 final E closeEntity, final List<ClusterSingletonServiceRegistration> services) {
195 Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
196 this.identifier = identifier;
197 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
198 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
199 this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
200 this.serviceGroup = Preconditions.checkNotNull(services);
201 LOG.debug("Instantiated new service group for {}", identifier);
205 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
206 final E closeEntity, final S entityOwnershipService) {
207 this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
211 public String getIdentifier() {
216 ListenableFuture<?> closeClusterSingletonGroup() {
217 // Assert our future first
218 final SettableFuture<Void> future = SettableFuture.create();
219 final SettableFuture<Void> existing = closeFuture.getAndSet(future);
220 if (existing != null) {
224 if (!lock.tryLock()) {
225 // The lock is held, the cleanup will be finished by the owner thread
226 LOG.debug("Singleton group {} cleanup postponed", identifier);
236 LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
240 private boolean isClosed() {
241 return closeFuture.get() != null;
245 private void startCapture() {
246 Verify.verify(capture == null, "Service group {} is already capturing events {}", identifier, capture);
247 capture = new ArrayList<>(0);
248 LOG.debug("Service group {} started capturing events", identifier);
251 private List<C> endCapture() {
252 final List<C> ret = Verify.verifyNotNull(capture, "Service group {} is not currently capturing", identifier);
254 LOG.debug("Service group {} finished capturing events, {} events to process", identifier, ret.size());
259 private void lockedClose(final SettableFuture<Void> future) {
260 if (serviceEntityReg != null) {
261 // We are still holding the service registration, close it now...
262 LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
264 serviceEntityReg.close();
265 serviceEntityReg = null;
267 // This can potentially mutate our state, so all previous checks need to be re-validated.
268 endCapture().forEach(this::lockedOwnershipChanged);
271 // Now check service entity state: if it is still owned, we need to wait until it is acknowledged as
273 switch (serviceEntityState) {
277 // We have either successfully shut down, or have never started up, proceed with termination
280 // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
281 // when that loss is reported.
282 LOG.debug("Service group {} is still owned, postponing termination", identifier);
285 // This is a significant event, as it relates to cluster split/join operations, operators need to know
286 // we are waiting for cluster join event.
287 LOG.info("Service group {} is still owned with split cluster, postponing termination", identifier);
290 throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
293 // We do not own service entity state: we need to ensure services are stopped.
294 if (stopServices()) {
295 LOG.debug("Service group {} started shutting down services, postponing termination", identifier);
299 // Local cleanup completed, release cleanup entity if needed
300 if (cleanupEntityReg != null) {
301 LOG.debug("Service group {} unregistering cleanup entity {}", identifier, cleanupEntity);
303 cleanupEntityReg.close();
304 cleanupEntityReg = null;
306 // This can potentially mutate our state, so all previous checks need to be re-validated.
307 endCapture().forEach(this::lockedOwnershipChanged);
310 switch (cleanupEntityState) {
314 // We have either successfully shut down, or have never started up, proceed with termination
317 // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
318 // when that loss is reported.
319 LOG.debug("Service group {} is still owns cleanup, postponing termination", identifier);
322 // This is a significant event, as it relates to cluster split/join operations, operators need to know
323 // we are waiting for cluster join event.
324 LOG.info("Service group {} is still owns cleanup with split cluster, postponing termination",
328 throw new IllegalStateException("Unhandled cleanup entity state " + serviceEntityState);
331 // No registrations left and no service operations pending, we are done
332 LOG.debug("Service group {} completing termination", identifier);
337 void initialize() throws CandidateAlreadyRegisteredException {
340 Preconditions.checkState(serviceEntityState == EntityState.UNREGISTERED,
341 "Singleton group %s was already initilized", identifier);
343 LOG.debug("Initializing service group {} with services {}", identifier, serviceGroup);
345 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
346 serviceEntityState = EntityState.REGISTERED;
347 endCapture().forEach(this::lockedOwnershipChanged);
353 private void checkNotClosed() {
354 Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
359 void registerService(final ClusterSingletonServiceRegistration reg) {
360 final ClusterSingletonService service = reg.getInstance();
361 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
366 Preconditions.checkState(serviceEntityState != EntityState.UNREGISTERED,
367 "Service group %s is not initialized yet", identifier);
369 LOG.debug("Adding service {} to service group {}", service, identifier);
370 serviceGroup.add(reg);
372 switch (localServicesState) {
374 LOG.debug("Service group {} starting late-registered service {}", identifier, service);
375 service.instantiateServiceInstance();
381 throw new IllegalStateException("Unhandled local services state " + localServicesState);
385 finishCloseIfNeeded();
391 boolean unregisterService(final ClusterSingletonServiceRegistration reg) {
392 final ClusterSingletonService service = reg.getInstance();
393 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
398 // There is a slight problem here, as the type does not match the list type, hence we need to tread
400 if (serviceGroup.size() == 1) {
401 Verify.verify(serviceGroup.contains(reg));
405 Verify.verify(serviceGroup.remove(reg));
406 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
408 switch (localServicesState) {
410 LOG.warn("Service group {} stopping unregistered service {}", identifier, service);
411 service.closeServiceInstance();
417 throw new IllegalStateException("Unhandled local services state " + localServicesState);
423 finishCloseIfNeeded();
428 void ownershipChanged(final C ownershipChange) {
429 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
433 if (capture != null) {
434 capture.add(ownershipChange);
436 lockedOwnershipChanged(ownershipChange);
440 finishCloseIfNeeded();
445 * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
446 * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
448 * @param ownershipChange reported change
451 private void lockedOwnershipChanged(final C ownershipChange) {
452 final E entity = ownershipChange.getEntity();
453 if (serviceEntity.equals(entity)) {
454 serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
455 } else if (cleanupEntity.equals(entity)) {
456 cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
458 LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
462 private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
465 case LOCAL_OWNERSHIP_GRANTED:
466 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
467 if (cleanupEntityReg == null) {
468 LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
472 LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
473 cleanupEntityState = EntityState.OWNED_JEOPARDY;
475 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
476 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
477 case REMOTE_OWNERSHIP_CHANGED:
478 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
479 LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
480 cleanupEntityState = EntityState.UNOWNED;
483 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
490 if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
491 // Pair info message with previous jeopardy
492 LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
496 case LOCAL_OWNERSHIP_GRANTED:
497 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
498 if (cleanupEntityReg == null) {
499 LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
503 cleanupEntityState = EntityState.OWNED;
504 switch (localServicesState) {
506 LOG.debug("Service group {} already has local services running", identifier);
512 LOG.debug("Service group {} has local services stopping, postponing startup", identifier);
515 throw new IllegalStateException("Unhandled local services state " + localServicesState);
518 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
519 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
520 cleanupEntityState = EntityState.UNOWNED;
523 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
524 case REMOTE_OWNERSHIP_CHANGED:
525 cleanupEntityState = EntityState.UNOWNED;
528 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
533 private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
535 LOG.info("Service group {} service entity ownership uncertain", identifier);
537 // Service entity ownership is uncertain, which means we want to record the state, but we do not want
538 // to stop local services nor do anything with the cleanup entity.
540 case LOCAL_OWNERSHIP_GRANTED:
541 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
542 if (serviceEntityReg == null) {
543 LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
547 serviceEntityState = EntityState.OWNED_JEOPARDY;
549 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
550 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
551 case REMOTE_OWNERSHIP_CHANGED:
552 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
553 serviceEntityState = EntityState.UNOWNED;
556 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
561 if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
562 // Pair info message with previous jeopardy
563 LOG.info("Service group {} service entity ownership ascertained", identifier);
567 case LOCAL_OWNERSHIP_GRANTED:
568 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
569 if (serviceEntityReg == null) {
570 LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
574 serviceEntityState = EntityState.OWNED;
577 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
578 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
579 LOG.debug("Service group {} lost service entity ownership", identifier);
580 serviceEntityState = EntityState.UNOWNED;
581 if (stopServices()) {
582 LOG.debug("Service group {} already stopping services, postponing cleanup", identifier);
586 if (cleanupEntityReg != null) {
587 cleanupEntityReg.close();
588 cleanupEntityReg = null;
591 case REMOTE_OWNERSHIP_CHANGED:
592 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
593 // No need to react, just update the state
594 serviceEntityState = EntityState.UNOWNED;
597 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
602 private void finishCloseIfNeeded() {
603 final SettableFuture<Void> future = closeFuture.get();
604 if (future != null) {
615 * Help method to registered DoubleCandidateEntity. It is first step
616 * before the actual instance take Leadership.
618 private void takeOwnership() {
620 LOG.debug("Service group {} is closed, skipping cleanup ownership bid", identifier);
624 LOG.debug("Service group {} registering cleanup entity", identifier);
628 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
629 cleanupEntityState = EntityState.REGISTERED;
630 } catch (CandidateAlreadyRegisteredException e) {
631 LOG.error("Service group {} failed to take ownership", identifier, e);
634 endCapture().forEach(this::lockedOwnershipChanged);
638 * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
640 @SuppressWarnings("checkstyle:IllegalCatch")
641 private void startServices() {
643 LOG.debug("Service group {} is closed, not starting services", identifier);
647 LOG.debug("Service group {} starting services", identifier);
648 serviceGroup.forEach(reg -> {
649 final ClusterSingletonService service = reg.getInstance();
650 LOG.debug("Starting service {}", service);
652 service.instantiateServiceInstance();
653 } catch (Exception e) {
654 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
658 localServicesState = ServiceState.STARTED;
659 LOG.debug("Service group {} services started", identifier);
662 @SuppressWarnings("checkstyle:IllegalCatch")
663 boolean stopServices() {
664 switch (localServicesState) {
666 localServicesState = ServiceState.STOPPING;
668 final List<ListenableFuture<?>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
669 for (final ClusterSingletonServiceRegistration reg : serviceGroup) {
670 final ClusterSingletonService service = reg.getInstance();
671 final ListenableFuture<?> future;
673 future = service.closeServiceInstance();
674 } catch (Exception e) {
675 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
680 serviceCloseFutureList.add(future);
683 LOG.debug("Service group {} initiated service shutdown", identifier);
685 Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<?>>() {
687 public void onFailure(final Throwable cause) {
688 LOG.warn("Service group {} service stopping reported error", identifier, cause);
693 public void onSuccess(final List<?> nulls) {
696 }, MoreExecutors.directExecutor());
698 return localServicesState == ServiceState.STOPPING;
700 LOG.debug("Service group {} has already stopped services", identifier);
703 LOG.debug("Service group {} is already stopping services", identifier);
706 throw new IllegalStateException("Unhandled local services state " + localServicesState);
710 void onServicesStopped() {
711 LOG.debug("Service group {} finished stopping services", identifier);
714 localServicesState = ServiceState.STOPPED;
717 LOG.debug("Service group {} closed, skipping service restart check", identifier);
721 // If we lost the service entity while services were stopping, we need to unregister cleanup entity
722 switch (serviceEntityState) {
725 // No need to churn cleanup entity
730 if (cleanupEntityReg != null) {
732 cleanupEntityReg.close();
733 cleanupEntityReg = null;
734 endCapture().forEach(this::lockedOwnershipChanged);
738 throw new IllegalStateException("Unhandled service entity state" + serviceEntityState);
741 if (cleanupEntityReg == null) {
742 LOG.debug("Service group {} does not have cleanup entity registered, skipping restart check",
747 // Double-check if the services should really be down
748 switch (cleanupEntityState) {
750 // We have finished stopping services, but we own cleanup, e.g. we should start them again.
759 throw new IllegalStateException("Unhandled cleanup entity state" + cleanupEntityState);
763 finishCloseIfNeeded();
768 public String toString() {
769 return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();