2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.annotation.CheckReturnValue;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
26 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
27 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
31 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
32 import org.opendaylight.yangtools.concepts.Path;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
38 * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
41 * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
42 * a result on new candidates appearing. We use two entities:
43 * - service entity, to which all nodes register
44 * - cleanup entity, which only the service entity owner registers to
47 * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
48 * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
49 * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
50 * granted until the old owner finishes the cleanup.
52 * @param <P> the instance identifier path type
53 * @param <E> the GenericEntity type
54 * @param <C> the GenericEntityOwnershipChange type
55 * @param <G> the GenericEntityOwnershipListener type
56 * @param <S> the GenericEntityOwnershipService type
58 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
59 C extends GenericEntityOwnershipChange<P, E>, G extends GenericEntityOwnershipListener<P, C>,
60 S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
63 * This group has been freshly allocated and has not been started yet.
67 * Operational state. Service entity is registered, but ownership was not resolved yet.
71 * Operational state. Service entity confirmed to be follower.
75 * Service entity acquired. Attempting to acquire cleanup entity.
79 * Both entities held and user services are being started.
83 * Steady state. Both entities held and services have finished starting.
87 * User services are being stopped due to either loss of an entity or a shutdown.
91 * We have stopped services and are now relinquishing the cleanup entity.
95 * Terminated, this group cannot be used anymore.
100 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
102 private final S entityOwnershipService;
103 private final String identifier;
105 /* Entity instances */
106 private final E serviceEntity;
107 private final E cleanupEntity;
109 private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
110 private final ReentrantLock lock = new ReentrantLock(true);
113 private final List<ClusterSingletonService> serviceGroup;
116 private State state = State.INITIAL;
119 private List<C> capture;
121 /* EOS Candidate Registrations */
123 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
125 private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
128 * Class constructor. Note: last argument is reused as-is.
130 * @param identifier non-empty string as identifier
131 * @param mainEntity as Entity instance
132 * @param closeEntity as Entity instance
133 * @param entityOwnershipService GenericEntityOwnershipService instance
134 * @param parent parent service
135 * @param services Services list
137 ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
138 final E closeEntity, final List<ClusterSingletonService> services) {
139 Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
140 this.identifier = identifier;
141 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
142 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
143 this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
144 this.serviceGroup = Preconditions.checkNotNull(services);
145 LOG.debug("Instantiated new service group for {}", identifier);
149 ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
150 final E closeEntity, final S entityOwnershipService) {
151 this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
155 public String getIdentifier() {
160 ListenableFuture<?> closeClusterSingletonGroup() {
161 // Assert our future first
162 final SettableFuture<Void> future = SettableFuture.create();
163 final SettableFuture<Void> existing = closeFuture.getAndSet(future);
164 if (existing != null) {
168 if (!lock.tryLock()) {
169 // The lock is held, the cleanup will be finished by the owner thread
170 LOG.debug("Singleton group {} cleanup postponed", identifier);
180 LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
184 private boolean isClosed() {
185 return closeFuture.get() != null;
189 private void updateState(final State newState) {
190 LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
191 state = Verify.verifyNotNull(newState);
195 private void lockedClose(final SettableFuture<Void> future) {
196 if (serviceEntityReg != null) {
197 LOG.debug("Service group {} unregistering", identifier);
198 serviceEntityReg.close();
199 serviceEntityReg = null;
204 // Not started: not much to do
208 // Already done: no-op
212 LOG.debug("Service group {} terminated", identifier);
216 // No-op, we will react to the loss of registration instead.
218 case STOPPING_SERVICES:
219 // Waiting for services. Will resume once we get notified.
221 case RELEASING_OWNERSHIP:
222 // Waiting for cleanup entity to flip, will resume afterwards.
224 case TAKING_OWNERSHIP:
225 // Abort taking of ownership and close
226 LOG.debug("Service group {} aborting ownership bid", identifier);
227 cleanupEntityReg.close();
228 cleanupEntityReg = null;
229 updateState(State.RELEASING_OWNERSHIP);
232 throw new IllegalStateException("Unhandled state " + state);
237 private void terminate(final SettableFuture<Void> future) {
238 updateState(State.TERMINATED);
239 Verify.verify(future.set(null));
243 void initialize() throws CandidateAlreadyRegisteredException {
244 LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
248 Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
251 // Catch events if they fire during this call
252 capture = new ArrayList<>(0);
253 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
254 state = State.REGISTERED;
256 final List<C> captured = capture;
258 captured.forEach(this::lockedOwnershipChanged);
264 private void checkNotClosed() {
265 Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
270 void registerService(final ClusterSingletonService service) {
271 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
274 LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
278 Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
279 serviceGroup.add(service);
283 case STARTING_SERVICES:
284 service.instantiateServiceInstance();
296 boolean unregisterService(final ClusterSingletonService service) {
297 Verify.verify(identifier.equals(service.getIdentifier().getValue()));
302 // There is a slight problem here, as the type does not match the list type, hence we need to tread
304 if (serviceGroup.size() == 1) {
305 Verify.verify(serviceGroup.contains(service));
309 Verify.verify(serviceGroup.remove(service));
310 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
314 case STARTING_SERVICES:
315 service.closeServiceInstance();
324 finishCloseIfNeeded();
329 void ownershipChanged(final C ownershipChange) {
330 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
334 if (capture != null) {
335 capture.add(ownershipChange);
337 lockedOwnershipChanged(ownershipChange);
341 finishCloseIfNeeded();
345 private void lockedOwnershipChanged(final C ownershipChange) {
346 if (ownershipChange.inJeopardy()) {
347 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
352 final E entity = ownershipChange.getEntity();
353 if (serviceEntity.equals(entity)) {
354 serviceOwnershipChanged(ownershipChange);
355 } else if (cleanupEntity.equals(entity)) {
356 cleanupCandidateOwnershipChanged(ownershipChange);
358 LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
362 private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
363 switch (ownershipChange.getState()) {
364 case LOCAL_OWNERSHIP_GRANTED:
366 case TAKING_OWNERSHIP:
374 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
375 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
377 case RELEASING_OWNERSHIP:
378 // Slight cheat: if we are closing down, we just need to notify the future
379 updateState(isClosed() ? State.INITIAL : State.STANDBY);
381 case STARTING_SERVICES:
383 case TAKING_OWNERSHIP:
384 LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
391 case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
392 case REMOTE_OWNERSHIP_CHANGED:
393 case REMOTE_OWNERSHIP_LOST_NO_OWNER:
398 LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
401 private void serviceOwnershipChanged(final C ownershipChange) {
402 switch (ownershipChange.getState()) {
403 case LOCAL_OWNERSHIP_GRANTED:
404 // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
407 case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
408 case LOCAL_OWNERSHIP_LOST_NO_OWNER:
409 // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
413 // Not needed notifications
414 LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
419 private void finishCloseIfNeeded() {
420 final SettableFuture<Void> future = closeFuture.get();
421 if (future != null) {
432 * Help method to registered DoubleCandidateEntity. It is first step
433 * before the actual instance take Leadership.
435 private void takeOwnership() {
437 LOG.debug("Service group {} is closed, not taking ownership", identifier);
441 LOG.debug("Group {} taking ownership", identifier);
443 updateState(State.TAKING_OWNERSHIP);
445 cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
446 } catch (CandidateAlreadyRegisteredException e) {
447 LOG.error("Service group {} failed to take ownership", identifier, e);
452 * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
454 @SuppressWarnings("checkstyle:IllegalCatch")
455 private void startServices() {
457 LOG.debug("Service group {} is closed, not starting services", identifier);
461 LOG.debug("Service group {} starting services", identifier);
462 serviceGroup.forEach(service -> {
463 LOG.debug("Starting service {}", service);
465 service.instantiateServiceInstance();
466 } catch (Exception e) {
467 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
471 LOG.debug("Service group {} services started", identifier);
472 updateState(State.OWNER);
476 * Help method calls suspendService method for stop this single cluster-wide service instance.
477 * The last async. step has to close DoubleCandidateRegistration reference what should initialize
478 * new election for DoubleCandidateEntity.
480 private void lostOwnership() {
481 LOG.debug("Service group {} lost ownership in state {}", identifier, state);
484 updateState(State.STANDBY);
489 case STARTING_SERVICES:
490 case STOPPING_SERVICES:
491 // No-op, as these will re-check state before proceeding
493 case TAKING_OWNERSHIP:
494 cleanupEntityReg.close();
495 cleanupEntityReg = null;
496 updateState(State.STANDBY);
501 LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
506 @SuppressWarnings("checkstyle:IllegalCatch")
507 void stopServices() {
508 updateState(State.STOPPING_SERVICES);
510 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
511 for (final ClusterSingletonService service : serviceGroup) {
512 final ListenableFuture<Void> future;
515 future = service.closeServiceInstance();
516 } catch (Exception e) {
517 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
522 serviceCloseFutureList.add(future);
525 Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
527 public void onFailure(final Throwable cause) {
528 LOG.warn("Service group {} service stopping reported error", identifier, cause);
533 public void onSuccess(final List<Void> nulls) {
539 void onServicesStopped() {
540 LOG.debug("Service group {} finished stopping services", identifier);
543 if (cleanupEntityReg != null) {
544 updateState(State.RELEASING_OWNERSHIP);
545 cleanupEntityReg.close();
546 cleanupEntityReg = null;
548 updateState(State.STANDBY);
552 finishCloseIfNeeded();
557 public String toString() {
558 return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();