2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.CheckReturnValue;
25 import javax.annotation.concurrent.GuardedBy;
26 import javax.annotation.concurrent.ThreadSafe;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
41 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
44 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
45 * a result on new candidates appearing. We use two entities:
46 * - service entity, to which all nodes register
47 * - cleanup entity, which only the service entity owner registers to
50 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
51 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
52 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
53 * granted until the old owner finishes the cleanup.
55 * @param <P> the instance identifier path type
56 * @param <E> the GenericEntity type
57 * @param <C> the GenericEntityOwnershipChange type
58 * @param <G> the GenericEntityOwnershipListener type
59 * @param <S> the GenericEntityOwnershipService type
62 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
63 C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
64 S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
66 private enum EntityState {
68 * This entity was never registered.
72 * Registration exists, but we are waiting for it to resolve.
76 * Registration indicated we are the owner.
80 * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
81 * another partition, for example.
85 * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
86 * do not need an UNOWNED_JEOPARDY state.
91 private enum ServiceState {
93 * Local services are stopped.
97 * Local services are up and running.
99 // FIXME: we should support async startup, which will require a STARTING state.
102 * Local services are being stopped.
107 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
109 private final S entityOwnershipService;
110 private final String identifier;
112 /* Entity instances */
113 private final E serviceEntity;
114 private final E cleanupEntity;
116 private final ReentrantLock lock = new ReentrantLock(true);
119 private final List<ClusterSingletonService> serviceGroup;
122 * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
123 * - user calling close()
124 * - service entity ownership
125 * - cleanup entity ownership
126 * - service shutdown future
128 * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
129 * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
130 * due to boilerplate overhead.
132 * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
133 * based on recorded bits -- without explicit representation of state machine state.
136 * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
137 * the user has closed the group and we are converging to termination.
139 // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
140 // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
141 // XXX: above needs a microbenchmark contention ever becomes a problem.
142 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
145 * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
146 * acquire {@link #cleanupEntity}.
149 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg = null;
151 * Service (base) entity last reported state.
154 private EntityState serviceEntityState = EntityState.UNREGISTERED;
157 * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
161 private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
163 * Cleanup (owner) entity last reported state.
166 private EntityState cleanupEntityState = EntityState.UNREGISTERED;
169 * Optional event capture list. This field is initialized when we interact with entity ownership service, to capture
170 * events reported during EOS method invocation -- like immediate acquisition of entity when we register it. This
171 * prevents bugs from recursion.
174 private List<C> capture = null;
177 * State of local services.
180 private ServiceState localServicesState = ServiceState.STOPPED;
183 * Class constructor. Note: last argument is reused as-is.
185 * @param identifier non-empty string as identifier
186 * @param mainEntity as Entity instance
187 * @param closeEntity as Entity instance
188 * @param entityOwnershipService GenericEntityOwnershipService instance
189 * @param parent parent service
190 * @param services Services list
192 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
193 final E closeEntity, final List<ClusterSingletonService> services) {
194 Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
195 this.identifier = identifier;
196 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
197 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
198 this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
199 this.serviceGroup = Preconditions.checkNotNull(services);
200 LOG.debug("Instantiated new service group for {}", identifier);
204 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
205 final E closeEntity, final S entityOwnershipService) {
206 this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
210 public String getIdentifier() {
215 ListenableFuture<?> closeClusterSingletonGroup() {
216 // Assert our future first
217 final SettableFuture<Void> future = SettableFuture.create();
218 final SettableFuture<Void> existing = closeFuture.getAndSet(future);
219 if (existing != null) {
223 if (!lock.tryLock()) {
224 // The lock is held, the cleanup will be finished by the owner thread
225 LOG.debug("Singleton group {} cleanup postponed", identifier);
235 LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
239 private boolean isClosed() {
240 return closeFuture.get() != null;
244 private void startCapture() {
245 Verify.verify(capture == null, "Service group {} is already capturing events {}", identifier, capture);
246 capture = new ArrayList<>(0);
247 LOG.debug("Service group {} started capturing events", identifier);
250 private List<C> endCapture() {
251 final List<C> ret = Verify.verifyNotNull(capture, "Service group {} is not currently capturing", identifier);
253 LOG.debug("Service group {} finished capturing events, {} events to process", identifier, ret.size());
258 private void lockedClose(final SettableFuture<Void> future) {
259 if (serviceEntityReg != null) {
260 // We are still holding the service registration, close it now...
261 LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
263 serviceEntityReg.close();
264 serviceEntityReg = null;
266 // This can potentially mutate our state, so all previous checks need to be re-validated.
267 endCapture().forEach(this::lockedOwnershipChanged);
270 // Now check service entity state: if it is still owned, we need to wait until it is acknowledged as
272 switch (serviceEntityState) {
276 // We have either successfully shut down, or have never started up, proceed with termination
279 // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
280 // when that loss is reported.
281 LOG.debug("Service group {} is still owned, postponing termination", identifier);
284 // This is a significant event, as it relates to cluster split/join operations, operators need to know
285 // we are waiting for cluster join event.
286 LOG.info("Service group {} is still owned with split cluster, postponing termination", identifier);
289 throw new IllegalStateException("Unhandled service entity state " + serviceEntityState);
292 // We do not own service entity state: we need to ensure services are stopped.
293 if (stopServices()) {
294 LOG.debug("Service group {} started shutting down services, postponing termination", identifier);
298 // Local cleanup completed, release cleanup entity if needed
299 if (cleanupEntityReg != null) {
300 LOG.debug("Service group {} unregistering cleanup entity {}", identifier, cleanupEntity);
302 cleanupEntityReg.close();
303 cleanupEntityReg = null;
305 // This can potentially mutate our state, so all previous checks need to be re-validated.
306 endCapture().forEach(this::lockedOwnershipChanged);
309 switch (cleanupEntityState) {
313 // We have either successfully shut down, or have never started up, proceed with termination
316 // We have unregistered, but EOS has not reported our loss of ownership. We will continue with shutdown
317 // when that loss is reported.
318 LOG.debug("Service group {} is still owns cleanup, postponing termination", identifier);
321 // This is a significant event, as it relates to cluster split/join operations, operators need to know
322 // we are waiting for cluster join event.
323 LOG.info("Service group {} is still owns cleanup with split cluster, postponing termination",
327 throw new IllegalStateException("Unhandled cleanup entity state " + serviceEntityState);
330 // No registrations left and no service operations pending, we are done
331 LOG.debug("Service group {} completing termination", identifier);
336 void initialize() throws CandidateAlreadyRegisteredException {
339 Preconditions.checkState(serviceEntityState == EntityState.UNREGISTERED,
340 "Singleton group %s was already initilized", identifier);
342 LOG.debug("Initializing service group {} with services {}", identifier, serviceGroup);
344 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
345 serviceEntityState = EntityState.REGISTERED;
346 endCapture().forEach(this::lockedOwnershipChanged);
352 private void checkNotClosed() {
353 Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
358 void registerService(final ClusterSingletonService service) {
359 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
364 Preconditions.checkState(serviceEntityState != EntityState.UNREGISTERED,
365 "Service group %s is not initialized yet", identifier);
367 LOG.debug("Adding service {} to service group {}", service, identifier);
368 serviceGroup.add(service);
370 switch (localServicesState) {
372 LOG.debug("Service group {} starting late-registered service {}", identifier, service);
373 service.instantiateServiceInstance();
379 throw new IllegalStateException("Unhandled local services state " + localServicesState);
383 finishCloseIfNeeded();
389 boolean unregisterService(final ClusterSingletonService service) {
390 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
395 // There is a slight problem here, as the type does not match the list type, hence we need to tread
397 if (serviceGroup.size() == 1) {
398 Verify.verify(serviceGroup.contains(service));
402 Verify.verify(serviceGroup.remove(service));
403 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
405 switch (localServicesState) {
407 LOG.warn("Service group {} stopping unregistered service {}", identifier, service);
408 service.closeServiceInstance();
414 throw new IllegalStateException("Unhandled local services state " + localServicesState);
420 finishCloseIfNeeded();
425 void ownershipChanged(final C ownershipChange) {
426 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
430 if (capture != null) {
431 capture.add(ownershipChange);
433 lockedOwnershipChanged(ownershipChange);
437 finishCloseIfNeeded();
442 * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
443 * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
445 * @param ownershipChange reported change
448 private void lockedOwnershipChanged(final C ownershipChange) {
449 final E entity = ownershipChange.getEntity();
450 if (serviceEntity.equals(entity)) {
451 serviceOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
452 } else if (cleanupEntity.equals(entity)) {
453 cleanupCandidateOwnershipChanged(ownershipChange.getState(), ownershipChange.inJeopardy());
455 LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
459 private void cleanupCandidateOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
462 case LOCAL_OWNERSHIP_GRANTED:
463 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
464 if (cleanupEntityReg == null) {
465 LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
469 LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
470 cleanupEntityState = EntityState.OWNED_JEOPARDY;
472 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
473 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
474 case REMOTE_OWNERSHIP_CHANGED:
475 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
476 LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
477 cleanupEntityState = EntityState.UNOWNED;
480 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
487 if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
488 // Pair info message with previous jeopardy
489 LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
493 case LOCAL_OWNERSHIP_GRANTED:
494 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
495 if (cleanupEntityReg == null) {
496 LOG.debug("Service group {} ignoring cleanup entity ownership when unregistered", identifier);
500 cleanupEntityState = EntityState.OWNED;
501 switch (localServicesState) {
503 LOG.debug("Service group {} already has local services running", identifier);
509 LOG.debug("Service group {} has local services stopping, postponing startup", identifier);
512 throw new IllegalStateException("Unhandled local services state " + localServicesState);
515 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
516 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
517 cleanupEntityState = EntityState.UNOWNED;
520 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
521 case REMOTE_OWNERSHIP_CHANGED:
522 cleanupEntityState = EntityState.UNOWNED;
525 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
530 private void serviceOwnershipChanged(final EntityOwnershipChangeState state, final boolean jeopardy) {
532 LOG.info("Service group {} service entity ownership uncertain", identifier);
534 // Service entity ownership is uncertain, which means we want to record the state, but we do not want
535 // to stop local services nor do anything with the cleanup entity.
537 case LOCAL_OWNERSHIP_GRANTED:
538 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
539 if (serviceEntityReg == null) {
540 LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
544 serviceEntityState = EntityState.OWNED_JEOPARDY;
546 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
547 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
548 case REMOTE_OWNERSHIP_CHANGED:
549 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
550 serviceEntityState = EntityState.UNOWNED;
553 throw new IllegalStateException("Unhandled cleanup entity jeopardy change " + state);
558 if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
559 // Pair info message with previous jeopardy
560 LOG.info("Service group {} service entity ownership ascertained", identifier);
564 case LOCAL_OWNERSHIP_GRANTED:
565 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
566 if (serviceEntityReg == null) {
567 LOG.debug("Service group {} ignoring service entity ownership when unregistered", identifier);
571 serviceEntityState = EntityState.OWNED;
574 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
575 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
576 LOG.debug("Service group {} lost service entity ownership", identifier);
577 serviceEntityState = EntityState.UNOWNED;
578 if (stopServices()) {
579 LOG.debug("Service group {} already stopping services, postponing cleanup", identifier);
583 if (cleanupEntityReg != null) {
584 cleanupEntityReg.close();
585 cleanupEntityReg = null;
588 case REMOTE_OWNERSHIP_CHANGED:
589 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
590 // No need to react, just update the state
591 serviceEntityState = EntityState.UNOWNED;
594 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
599 private void finishCloseIfNeeded() {
600 final SettableFuture<Void> future = closeFuture.get();
601 if (future != null) {
612 * Help method to registered DoubleCandidateEntity. It is first step
613 * before the actual instance take Leadership.
615 private void takeOwnership() {
617 LOG.debug("Service group {} is closed, skipping cleanup ownership bid", identifier);
621 LOG.debug("Service group {} registering cleanup entity", identifier);
625 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
626 cleanupEntityState = EntityState.REGISTERED;
627 } catch (CandidateAlreadyRegisteredException e) {
628 LOG.error("Service group {} failed to take ownership", identifier, e);
631 endCapture().forEach(this::lockedOwnershipChanged);
635 * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
637 @SuppressWarnings("checkstyle:IllegalCatch")
638 private void startServices() {
640 LOG.debug("Service group {} is closed, not starting services", identifier);
644 LOG.debug("Service group {} starting services", identifier);
645 serviceGroup.forEach(service -> {
646 LOG.debug("Starting service {}", service);
648 service.instantiateServiceInstance();
649 } catch (Exception e) {
650 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
654 localServicesState = ServiceState.STARTED;
655 LOG.debug("Service group {} services started", identifier);
658 @SuppressWarnings("checkstyle:IllegalCatch")
659 boolean stopServices() {
660 switch (localServicesState) {
662 localServicesState = ServiceState.STOPPING;
664 final List<ListenableFuture<?>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
665 for (final ClusterSingletonService service : serviceGroup) {
666 final ListenableFuture<?> future;
669 future = service.closeServiceInstance();
670 } catch (Exception e) {
671 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
676 serviceCloseFutureList.add(future);
679 LOG.debug("Service group {} initiated service shutdown", identifier);
681 Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<?>>() {
683 public void onFailure(final Throwable cause) {
684 LOG.warn("Service group {} service stopping reported error", identifier, cause);
689 public void onSuccess(final List<?> nulls) {
692 }, MoreExecutors.directExecutor());
694 return localServicesState == ServiceState.STOPPING;
696 LOG.debug("Service group {} has already stopped services", identifier);
699 LOG.debug("Service group {} is already stopping services", identifier);
702 throw new IllegalStateException("Unhandled local services state " + localServicesState);
706 void onServicesStopped() {
707 LOG.debug("Service group {} finished stopping services", identifier);
710 localServicesState = ServiceState.STOPPED;
713 LOG.debug("Service group {} closed, skipping service restart check", identifier);
717 // If we lost the service entity while services were stopping, we need to unregister cleanup entity
718 switch (serviceEntityState) {
721 // No need to churn cleanup entity
726 if (cleanupEntityReg != null) {
728 cleanupEntityReg.close();
729 cleanupEntityReg = null;
730 endCapture().forEach(this::lockedOwnershipChanged);
734 throw new IllegalStateException("Unhandled service entity state" + serviceEntityState);
737 if (cleanupEntityReg == null) {
738 LOG.debug("Service group {} does not have cleanup entity registered, skipping restart check",
743 // Double-check if the services should really be down
744 switch (cleanupEntityState) {
746 // We have finished stopping services, but we own cleanup, e.g. we should start them again.
755 throw new IllegalStateException("Unhandled cleanup entity state" + cleanupEntityState);
759 finishCloseIfNeeded();
764 public String toString() {
765 return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();