2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.CheckReturnValue;
25 import javax.annotation.concurrent.GuardedBy;
26 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
27 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
33 import org.opendaylight.yangtools.concepts.Path;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
39 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
42 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
43 * a result on new candidates appearing. We use two entities:
44 * - service entity, to which all nodes register
45 * - cleanup entity, which only the service entity owner registers to
48 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
49 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
50 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
51 * granted until the old owner finishes the cleanup.
53 * @param <P> the instance identifier path type
54 * @param <E> the GenericEntity type
55 * @param <C> the GenericEntityOwnershipChange type
56 * @param <G> the GenericEntityOwnershipListener type
57 * @param <S> the GenericEntityOwnershipService type
59 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
60 C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
61 S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
64 * This group has been freshly allocated and has not been started yet.
68 * Operational state. Service entity is registered, but ownership was not resolved yet.
72 * Operational state. Service entity confirmed to be follower.
76 * Service entity acquired. Attempting to acquire cleanup entity.
80 * Both entities held and user services are being started.
84 * Steady state. Both entities held and services have finished starting.
88 * User services are being stopped due to either loss of an entity or a shutdown.
92 * We have stopped services and are now relinquishing the cleanup entity.
96 * Terminated, this group cannot be used anymore.
101 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
103 private final S entityOwnershipService;
104 private final String identifier;
106 /* Entity instances */
107 private final E serviceEntity;
108 private final E cleanupEntity;
110 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
111 private final ReentrantLock lock = new ReentrantLock(true);
114 private final List<ClusterSingletonService> serviceGroup;
117 private State state = State.INITIAL;
120 private List<C> capture;
122 /* EOS Candidate Registrations */
124 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
126 private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
129 * Class constructor. Note: last argument is reused as-is.
131 * @param identifier non-empty string as identifier
132 * @param mainEntity as Entity instance
133 * @param closeEntity as Entity instance
134 * @param entityOwnershipService GenericEntityOwnershipService instance
135 * @param parent parent service
136 * @param services Services list
138 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
139 final E closeEntity, final List<ClusterSingletonService> services) {
140 Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
141 this.identifier = identifier;
142 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
143 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
144 this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
145 this.serviceGroup = Preconditions.checkNotNull(services);
146 LOG.debug("Instantiated new service group for {}", identifier);
150 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
151 final E closeEntity, final S entityOwnershipService) {
152 this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
156 public String getIdentifier() {
161 ListenableFuture<?> closeClusterSingletonGroup() {
162 // Assert our future first
163 final SettableFuture<Void> future = SettableFuture.create();
164 final SettableFuture<Void> existing = closeFuture.getAndSet(future);
165 if (existing != null) {
169 if (!lock.tryLock()) {
170 // The lock is held, the cleanup will be finished by the owner thread
171 LOG.debug("Singleton group {} cleanup postponed", identifier);
181 LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
185 private boolean isClosed() {
186 return closeFuture.get() != null;
190 private void updateState(final State newState) {
191 LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
192 state = Verify.verifyNotNull(newState);
196 private void lockedClose(final SettableFuture<Void> future) {
197 if (serviceEntityReg != null) {
198 LOG.debug("Service group {} unregistering", identifier);
199 serviceEntityReg.close();
200 serviceEntityReg = null;
205 // Not started: not much to do
209 // Already done: no-op
213 LOG.debug("Service group {} terminated", identifier);
217 // No-op, we will react to the loss of registration instead.
219 case STOPPING_SERVICES:
220 // Waiting for services. Will resume once we get notified.
222 case RELEASING_OWNERSHIP:
223 // Waiting for cleanup entity to flip, will resume afterwards.
225 case TAKING_OWNERSHIP:
226 // Abort taking of ownership and close
227 LOG.debug("Service group {} aborting ownership bid", identifier);
228 cleanupEntityReg.close();
229 cleanupEntityReg = null;
230 updateState(State.RELEASING_OWNERSHIP);
233 throw new IllegalStateException("Unhandled state " + state);
238 private void terminate(final SettableFuture<Void> future) {
239 updateState(State.TERMINATED);
240 Verify.verify(future.set(null));
244 void initialize() throws CandidateAlreadyRegisteredException {
245 LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
249 Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
252 // Catch events if they fire during this call
253 capture = new ArrayList<>(0);
254 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
255 state = State.REGISTERED;
257 final List<C> captured = capture;
259 captured.forEach(this::lockedOwnershipChanged);
265 private void checkNotClosed() {
266 Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
271 void registerService(final ClusterSingletonService service) {
272 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
275 LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
279 Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
280 serviceGroup.add(service);
284 case STARTING_SERVICES:
285 service.instantiateServiceInstance();
297 boolean unregisterService(final ClusterSingletonService service) {
298 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
303 // There is a slight problem here, as the type does not match the list type, hence we need to tread
305 if (serviceGroup.size() == 1) {
306 Verify.verify(serviceGroup.contains(service));
310 Verify.verify(serviceGroup.remove(service));
311 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
315 case STARTING_SERVICES:
316 service.closeServiceInstance();
325 finishCloseIfNeeded();
330 void ownershipChanged(final C ownershipChange) {
331 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
335 if (capture != null) {
336 capture.add(ownershipChange);
338 lockedOwnershipChanged(ownershipChange);
342 finishCloseIfNeeded();
346 private void lockedOwnershipChanged(final C ownershipChange) {
347 if (ownershipChange.inJeopardy()) {
348 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
353 final E entity = ownershipChange.getEntity();
354 if (serviceEntity.equals(entity)) {
355 serviceOwnershipChanged(ownershipChange);
356 } else if (cleanupEntity.equals(entity)) {
357 cleanupCandidateOwnershipChanged(ownershipChange);
359 LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
363 private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
364 switch (ownershipChange.getState()) {
365 case LOCAL_OWNERSHIP_GRANTED:
367 case TAKING_OWNERSHIP:
375 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
376 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
378 case RELEASING_OWNERSHIP:
379 // Slight cheat: if we are closing down, we just need to notify the future
380 updateState(isClosed() ? State.INITIAL : State.STANDBY);
382 case STARTING_SERVICES:
384 case TAKING_OWNERSHIP:
385 LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
392 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
393 case REMOTE_OWNERSHIP_CHANGED:
394 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
399 LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
402 private void serviceOwnershipChanged(final C ownershipChange) {
403 switch (ownershipChange.getState()) {
404 case LOCAL_OWNERSHIP_GRANTED:
405 // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
408 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
409 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
410 // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
414 // Not needed notifications
415 LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
420 private void finishCloseIfNeeded() {
421 final SettableFuture<Void> future = closeFuture.get();
422 if (future != null) {
433 * Help method to registered DoubleCandidateEntity. It is first step
434 * before the actual instance take Leadership.
436 private void takeOwnership() {
438 LOG.debug("Service group {} is closed, not taking ownership", identifier);
442 LOG.debug("Group {} taking ownership", identifier);
444 updateState(State.TAKING_OWNERSHIP);
446 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
447 } catch (CandidateAlreadyRegisteredException e) {
448 LOG.error("Service group {} failed to take ownership", identifier, e);
453 * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
455 @SuppressWarnings("checkstyle:IllegalCatch")
456 private void startServices() {
458 LOG.debug("Service group {} is closed, not starting services", identifier);
462 LOG.debug("Service group {} starting services", identifier);
463 serviceGroup.forEach(service -> {
464 LOG.debug("Starting service {}", service);
466 service.instantiateServiceInstance();
467 } catch (Exception e) {
468 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
472 LOG.debug("Service group {} services started", identifier);
473 updateState(State.OWNER);
477 * Help method calls suspendService method for stop this single cluster-wide service instance.
478 * The last async. step has to close DoubleCandidateRegistration reference what should initialize
479 * new election for DoubleCandidateEntity.
481 private void lostOwnership() {
482 LOG.debug("Service group {} lost ownership in state {}", identifier, state);
485 updateState(State.STANDBY);
490 case STARTING_SERVICES:
491 case STOPPING_SERVICES:
492 // No-op, as these will re-check state before proceeding
494 case TAKING_OWNERSHIP:
495 cleanupEntityReg.close();
496 cleanupEntityReg = null;
497 updateState(State.STANDBY);
502 LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
507 @SuppressWarnings("checkstyle:IllegalCatch")
508 void stopServices() {
509 updateState(State.STOPPING_SERVICES);
511 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
512 for (final ClusterSingletonService service : serviceGroup) {
513 final ListenableFuture<Void> future;
516 future = service.closeServiceInstance();
517 } catch (Exception e) {
518 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
523 serviceCloseFutureList.add(future);
526 Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
528 public void onFailure(final Throwable cause) {
529 LOG.warn("Service group {} service stopping reported error", identifier, cause);
534 public void onSuccess(final List<Void> nulls) {
537 }, MoreExecutors.directExecutor());
540 void onServicesStopped() {
541 LOG.debug("Service group {} finished stopping services", identifier);
544 if (cleanupEntityReg != null) {
545 updateState(State.RELEASING_OWNERSHIP);
546 cleanupEntityReg.close();
547 cleanupEntityReg = null;
549 updateState(State.STANDBY);
553 finishCloseIfNeeded();
558 public String toString() {
559 return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();