Fix EOSClusterSingletonServiceProvider shutdown
[mdsal.git] / singleton-service / mdsal-singleton-impl / src / main / java / org / opendaylight / mdsal / singleton / impl / ActiveServiceGroup.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.impl;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.MoreObjects;
17 import com.google.common.collect.ImmutableList;
18 import com.google.common.collect.ImmutableSet;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.checkerframework.checker.lock.qual.GuardedBy;
32 import org.checkerframework.checker.lock.qual.Holding;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
35 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
36 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
37 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
38 import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
39 import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Implementation of {@link ServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
46  * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
47  *
48  * <p>
49  * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
50  * a result on new candidates appearing. We use two entities:
51  * <ol>
52  *   <li>service entity, to which all nodes register</li>
53  *   <li>cleanup entity, which only the service entity owner registers to</li>
54  * </ol>
55  *
56  * <p>
57  * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
58  * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
59  * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
60  * granted until the old owner finishes the cleanup.
61  */
62 final class ActiveServiceGroup extends ServiceGroup {
63
64     private enum EntityState {
65         /**
66          * This entity was never registered.
67          */
68         UNREGISTERED,
69         /**
70          * Registration exists, but we are waiting for it to resolve.
71          */
72         REGISTERED,
73         /**
74          * Registration indicated we are the owner.
75          */
76         OWNED,
77         /**
78          * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
79          * another partition, for example.
80          */
81         OWNED_JEOPARDY,
82         /**
83          * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
84          * do not need an UNOWNED_JEOPARDY state.
85          */
86         UNOWNED,
87     }
88
89     enum ServiceState {
90         /**
91          * Local service is up and running.
92          */
93         // FIXME: we should support async startup, which will require a STARTING state.
94         STARTED,
95         /**
96          * Local service is being stopped.
97          */
98         STOPPING,
99     }
100
101     private static final Logger LOG = LoggerFactory.getLogger(ActiveServiceGroup.class);
102
103     private final @NonNull DOMEntityOwnershipService entityOwnershipService;
104     private final @NonNull ServiceGroupIdentifier identifier;
105
106     /* Entity instances */
107     private final @NonNull DOMEntity serviceEntity;
108     private final @NonNull DOMEntity cleanupEntity;
109
110     private final Set<ServiceRegistration> members = ConcurrentHashMap.newKeySet();
111     // Guarded by lock
112     private final Map<ServiceRegistration, ServiceInfo> services = new HashMap<>();
113
114     // Marker for when any state changed
115     private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> DIRTY_UPDATER =
116             AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "dirty");
117     private volatile int dirty;
118
119     // Simplified lock: non-reentrant, support tryLock() only
120     private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> LOCK_UPDATER =
121             AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "lock");
122     @SuppressWarnings("unused")
123     private volatile int lock;
124
125     /*
126      * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
127      * - user calling close()
128      * - service entity ownership
129      * - cleanup entity ownership
130      * - service shutdown future
131      *
132      * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
133      * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
134      * due to boilerplate overhead.
135      *
136      * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
137      * based on recorded bits -- without explicit representation of state machine state.
138      */
139     /**
140      * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
141      * the user has closed the group and we are converging to termination.
142      */
143     // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
144     // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
145     // XXX: above needs a microbenchmark contention ever becomes a problem.
146     private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
147
148     /**
149      * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
150      * acquire {@link #cleanupEntity}.
151      */
152     @GuardedBy("this")
153     private Registration serviceEntityReg = null;
154     /**
155      * Service (base) entity last reported state.
156      */
157     @GuardedBy("this")
158     private EntityState serviceEntityState = EntityState.UNREGISTERED;
159
160     /**
161      * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
162      * and startup.
163      */
164     @GuardedBy("this")
165     private Registration cleanupEntityReg;
166     /**
167      * Cleanup (owner) entity last reported state.
168      */
169     @GuardedBy("this")
170     private EntityState cleanupEntityState = EntityState.UNREGISTERED;
171
172     private volatile boolean initialized;
173
174     /**
175      * Class constructor. Note: last argument is reused as-is.
176      *
177      * @param identifier non-empty string as identifier
178      * @param serviceEntity as Entity instance
179      * @param cleanupEntity as Entity instance
180      * @param entityOwnershipService GenericEntityOwnershipService instance
181      * @param parent parent service
182      * @param services Services list
183      */
184     ActiveServiceGroup(final ServiceGroupIdentifier identifier, final DOMEntityOwnershipService entityOwnershipService,
185             final DOMEntity serviceEntity, final DOMEntity cleanupEntity, final List<ServiceRegistration> services) {
186         this.identifier = requireNonNull(identifier);
187         this.entityOwnershipService = requireNonNull(entityOwnershipService);
188         this.serviceEntity = requireNonNull(serviceEntity);
189         this.cleanupEntity = requireNonNull(cleanupEntity);
190         members.addAll(services);
191
192         LOG.debug("Instantiated new service group for {}", identifier);
193     }
194
195     @VisibleForTesting
196     ActiveServiceGroup(final ServiceGroupIdentifier identifier, final DOMEntity serviceEntity,
197             final DOMEntity cleanupEntity, final DOMEntityOwnershipService entityOwnershipService) {
198         this(identifier, entityOwnershipService, serviceEntity, cleanupEntity, ImmutableList.of());
199     }
200
201     @Override
202     public ServiceGroupIdentifier getIdentifier() {
203         return identifier;
204     }
205
206     @Override
207     ListenableFuture<?> closeClusterSingletonGroup() {
208         final var ret = destroyGroup();
209         members.clear();
210         markDirty();
211
212         if (tryLock()) {
213             reconcileState();
214         } else {
215             LOG.debug("Service group {} postponing sync on close", identifier);
216         }
217
218         return ret;
219     }
220
221     private boolean isClosed() {
222         return closeFuture.get() != null;
223     }
224
225     @Override
226     void initialize() throws CandidateAlreadyRegisteredException {
227         verify(tryLock());
228         try {
229             checkState(!initialized, "Singleton group %s was already initilized", identifier);
230             LOG.debug("Initializing service group {} with services {}", identifier, members);
231             synchronized (this) {
232                 serviceEntityState = EntityState.REGISTERED;
233                 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
234                 initialized = true;
235             }
236         } finally {
237             unlock();
238         }
239     }
240
241     private void checkNotClosed() {
242         checkState(!isClosed(), "Service group %s has already been closed", identifier);
243     }
244
245     @Override
246     void registerService(final ServiceRegistration reg) {
247         final var service = verifyRegistration(reg);
248         checkNotClosed();
249
250         checkState(initialized, "Service group %s is not initialized yet", identifier);
251
252         // First put the service
253         LOG.debug("Adding service {} to service group {}", service, identifier);
254         verify(members.add(reg));
255         markDirty();
256
257         if (!tryLock()) {
258             LOG.debug("Service group {} delayed register of {}", identifier, reg);
259             return;
260         }
261
262         reconcileState();
263     }
264
265     @Override
266     ListenableFuture<?> unregisterService(final ServiceRegistration reg) {
267         verifyRegistration(reg);
268         checkNotClosed();
269
270         verify(members.remove(reg));
271         markDirty();
272         if (members.isEmpty()) {
273             // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
274             // before we start applying state, because while we do not re-enter, the user is free to do whatever,
275             // notably including registering a service with the same ID from the service shutdown hook. That
276             // registration request needs to hit the successor of this group.
277             return destroyGroup();
278         }
279
280         if (tryLock()) {
281             reconcileState();
282         } else {
283             LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
284         }
285         return null;
286     }
287
288     private ClusterSingletonService verifyRegistration(final ServiceRegistration reg) {
289         final var service = reg.getInstance();
290         verify(identifier.equals(service.getIdentifier()));
291         return service;
292     }
293
294     private synchronized @NonNull ListenableFuture<?> destroyGroup() {
295         final var future = SettableFuture.<Void>create();
296         final var witness = closeFuture.compareAndExchange(null, future);
297         if (witness != null) {
298             return witness;
299         }
300
301         if (serviceEntityReg != null) {
302             // We are still holding the service registration, close it now...
303             LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
304             serviceEntityReg.close();
305             serviceEntityReg = null;
306         }
307
308         markDirty();
309         return future;
310     }
311
312     @Override
313     void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change, final boolean inJeopardy) {
314         synchronized (this) {
315             lockedOwnershipChanged(entity, change, inJeopardy);
316         }
317
318         if (isDirty()) {
319             if (!tryLock()) {
320                 LOG.debug("Service group {} postponing ownership change sync", identifier);
321                 return;
322             }
323
324             reconcileState();
325         }
326     }
327
328     /**
329      * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
330      * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
331      *
332      * @param ownershipChange reported change
333      */
334     @Holding("this")
335     private void lockedOwnershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
336             final boolean inJeopardy) {
337         if (serviceEntity.equals(entity)) {
338             serviceOwnershipChanged(change, inJeopardy);
339             markDirty();
340         } else if (cleanupEntity.equals(entity)) {
341             cleanupCandidateOwnershipChanged(change, inJeopardy);
342             markDirty();
343         } else {
344             LOG.warn("Group {} received unrecognized entity {}", identifier, entity);
345         }
346     }
347
348     @Holding("this")
349     private void cleanupCandidateOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
350         if (jeopardy) {
351             cleanupEntityState = switch (state) {
352                 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> {
353                     LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
354                     yield EntityState.OWNED_JEOPARDY;
355                 }
356                 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
357                      REMOTE_OWNERSHIP_LOST_NO_OWNER -> {
358                     LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
359                     yield EntityState.UNOWNED;
360                 }
361             };
362             return;
363         }
364
365         if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
366             // Pair info message with previous jeopardy
367             LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
368         }
369
370         cleanupEntityState = switch (state) {
371             case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED;
372             case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_LOST_NO_OWNER,
373                  REMOTE_OWNERSHIP_CHANGED -> EntityState.UNOWNED;
374         };
375     }
376
377     @Holding("this")
378     private void serviceOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
379         if (jeopardy) {
380             LOG.info("Service group {} service entity ownership uncertain", identifier);
381             serviceEntityState = switch (state) {
382                 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED_JEOPARDY;
383                 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
384                      REMOTE_OWNERSHIP_LOST_NO_OWNER -> EntityState.UNOWNED;
385             };
386             return;
387         }
388
389         if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
390             // Pair info message with previous jeopardy
391             LOG.info("Service group {} service entity ownership ascertained", identifier);
392         }
393
394         switch (state) {
395             case LOCAL_OWNERSHIP_GRANTED:
396             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
397                 LOG.debug("Service group {} acquired service entity ownership", identifier);
398                 serviceEntityState = EntityState.OWNED;
399                 break;
400             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
401             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
402             case REMOTE_OWNERSHIP_CHANGED:
403             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
404                 LOG.debug("Service group {} lost service entity ownership", identifier);
405                 serviceEntityState = EntityState.UNOWNED;
406                 break;
407             default:
408                 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
409         }
410     }
411
412     // has to be called with lock asserted, which will be released prior to returning
413     private void reconcileState() {
414         // Always check if there is any state change to be applied.
415         while (true) {
416             try {
417                 if (conditionalClean()) {
418                     tryReconcileState();
419                 }
420             } finally {
421                 // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
422                 // - registration
423                 // - unregistration
424                 // - service future completed
425                 // - entity state changed
426                 //
427                 // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
428                 // be dirty again. This closes the following race condition:
429                 //
430                 // A: runs these checks holding the lock
431                 // B: modifies them, fails to acquire lock
432                 // A: releases lock -> noone takes care of reconciliation
433
434                 unlock();
435             }
436
437             if (isDirty()) {
438                 if (tryLock()) {
439                     LOG.debug("Service group {} re-running reconciliation", identifier);
440                     continue;
441                 }
442
443                 LOG.debug("Service group {} will be reconciled by someone else", identifier);
444             } else {
445                 LOG.debug("Service group {} is completely reconciled", identifier);
446             }
447
448             break;
449         }
450     }
451
452     private void serviceTransitionCompleted() {
453         markDirty();
454         if (tryLock()) {
455             reconcileState();
456         }
457     }
458
459     // Has to be called with lock asserted
460     private void tryReconcileState() {
461         // First take a safe snapshot of current state on which we will base our decisions.
462         final Set<ServiceRegistration> localMembers;
463         final boolean haveCleanup;
464         final boolean haveService;
465         synchronized (this) {
466             if (serviceEntityReg != null) {
467                 haveService = switch (serviceEntityState) {
468                     case OWNED, OWNED_JEOPARDY -> true;
469                     case REGISTERED, UNOWNED, UNREGISTERED -> false;
470                 };
471             } else {
472                 haveService = false;
473             }
474
475             if (haveService && cleanupEntityReg == null) {
476                 // We have the service entity but have not registered for cleanup entity. Do that now and retry.
477                 LOG.debug("Service group {} registering cleanup entity", identifier);
478                 try {
479                     cleanupEntityState = EntityState.REGISTERED;
480                     cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
481                 } catch (CandidateAlreadyRegisteredException e) {
482                     LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
483                     if (serviceEntityReg != null) {
484                         serviceEntityReg.close();
485                         serviceEntityReg = null;
486                     }
487                 }
488                 markDirty();
489                 return;
490             }
491
492             if (cleanupEntityReg != null) {
493                 haveCleanup = switch (cleanupEntityState) {
494                     case OWNED -> true;
495                     case OWNED_JEOPARDY, REGISTERED, UNOWNED, UNREGISTERED -> false;
496                 };
497             } else {
498                 haveCleanup = false;
499             }
500
501             localMembers = ImmutableSet.copyOf(members);
502         }
503
504         if (haveService && haveCleanup) {
505             ensureServicesStarting(localMembers);
506             return;
507         }
508
509         ensureServicesStopping();
510
511         if (!haveService && services.isEmpty()) {
512             LOG.debug("Service group {} has no running services", identifier);
513             final boolean canFinishClose;
514             synchronized (this) {
515                 if (cleanupEntityReg != null) {
516                     LOG.debug("Service group {} releasing cleanup entity", identifier);
517                     cleanupEntityReg.close();
518                     cleanupEntityReg = null;
519                 }
520
521                 canFinishClose = switch (cleanupEntityState) {
522                     case OWNED, OWNED_JEOPARDY, REGISTERED -> false;
523                     case UNOWNED, UNREGISTERED -> true;
524                 };
525             }
526
527             if (canFinishClose) {
528                 final SettableFuture<Void> localFuture = closeFuture.get();
529                 if (localFuture != null && !localFuture.isDone()) {
530                     LOG.debug("Service group {} completing termination", identifier);
531                     localFuture.set(null);
532                 }
533             }
534         }
535     }
536
537     // Has to be called with lock asserted
538     @SuppressWarnings("illegalCatch")
539     private void ensureServicesStarting(final Set<ServiceRegistration> localConfig) {
540         LOG.debug("Service group {} starting services", identifier);
541
542         // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
543         // example when this method is executed as part of unregisterService() call. In that case we need to ensure
544         // services in the list are stopping
545         final var it = services.entrySet().iterator();
546         while (it.hasNext()) {
547             final var entry = it.next();
548             final var reg = entry.getKey();
549             if (!localConfig.contains(reg)) {
550                 final var newInfo = ensureStopping(reg, entry.getValue());
551                 if (newInfo != null) {
552                     entry.setValue(newInfo);
553                 } else {
554                     it.remove();
555                 }
556             }
557         }
558
559         // Now make sure member services are being juggled around
560         for (var reg : localConfig) {
561             if (!services.containsKey(reg)) {
562                 final var service = reg.getInstance();
563                 LOG.debug("Starting service {}", service);
564
565                 try {
566                     service.instantiateServiceInstance();
567                 } catch (Exception e) {
568                     LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
569                         e);
570                     continue;
571                 }
572
573                 services.put(reg, ServiceInfo.STARTED);
574             }
575         }
576     }
577
578     // Has to be called with lock asserted
579     private void ensureServicesStopping() {
580         final var it = services.entrySet().iterator();
581         while (it.hasNext()) {
582             final var entry = it.next();
583             final var newInfo = ensureStopping(entry.getKey(), entry.getValue());
584             if (newInfo != null) {
585                 entry.setValue(newInfo);
586             } else {
587                 it.remove();
588             }
589         }
590     }
591
592     @SuppressWarnings("illegalCatch")
593     private ServiceInfo ensureStopping(final ServiceRegistration reg, final ServiceInfo info) {
594         switch (info.getState()) {
595             case STARTED:
596                 final var service = reg.getInstance();
597
598                 LOG.debug("Service group {} stopping service {}", identifier, service);
599                 final @NonNull ListenableFuture<?> future;
600                 try {
601                     future = verifyNotNull(service.closeServiceInstance());
602                 } catch (Exception e) {
603                     LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
604                         e);
605                     return null;
606                 }
607
608                 Futures.addCallback(future, new FutureCallback<Object>() {
609                     @Override
610                     public void onSuccess(final Object result) {
611                         LOG.debug("Service group {} service {} stopped successfully", identifier, service);
612                         serviceTransitionCompleted();
613                     }
614
615                     @Override
616                     public void onFailure(final Throwable cause) {
617                         LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
618                         serviceTransitionCompleted();
619                     }
620                 }, MoreExecutors.directExecutor());
621                 return info.toState(ServiceState.STOPPING, future);
622             case STOPPING:
623                 if (info.getFuture().isDone()) {
624                     LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
625                     return null;
626                 }
627                 return info;
628             default:
629                 throw new IllegalStateException("Unhandled state " + info.getState());
630         }
631     }
632
633     private void markDirty() {
634         dirty = 1;
635     }
636
637     private boolean isDirty() {
638         return dirty != 0;
639     }
640
641     private boolean conditionalClean() {
642         return DIRTY_UPDATER.compareAndSet(this, 1, 0);
643     }
644
645     private boolean tryLock() {
646         return LOCK_UPDATER.compareAndSet(this, 0, 1);
647     }
648
649     private boolean unlock() {
650         verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
651         return true;
652     }
653
654     @Override
655     public String toString() {
656         return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();
657     }
658 }