ServiceGroupIdentifier should be a record
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ActiveServiceGroup.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.dom.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.base.Verify.verifyNotNull;
14 import static java.util.Objects.requireNonNull;
15
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.MoreObjects;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.SettableFuture;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.checkerframework.checker.lock.qual.GuardedBy;
33 import org.checkerframework.checker.lock.qual.Holding;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
36 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
37 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Implementation of {@link ServiceGroup} on top of the Entity Ownership Service. Since EOS is atomic
46  * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
47  *
48  * <p>
49  * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
50  * a result on new candidates appearing. We use two entities:
51  * <ol>
52  *   <li>service entity, to which all nodes register</li>
53  *   <li>cleanup entity, which only the service entity owner registers to</li>
54  * </ol>
55  *
56  * <p>
57  * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
58  * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
59  * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
60  * granted until the old owner finishes the cleanup.
61  */
62 final class ActiveServiceGroup extends ServiceGroup {
63
64     private enum EntityState {
65         /**
66          * This entity was never registered.
67          */
68         UNREGISTERED,
69         /**
70          * Registration exists, but we are waiting for it to resolve.
71          */
72         REGISTERED,
73         /**
74          * Registration indicated we are the owner.
75          */
76         OWNED,
77         /**
78          * Registration indicated we are the owner, but global state is uncertain -- meaning there can be owners in
79          * another partition, for example.
80          */
81         OWNED_JEOPARDY,
82         /**
83          * Registration indicated we are not the owner. In this state we do not care about global state, therefore we
84          * do not need an UNOWNED_JEOPARDY state.
85          */
86         UNOWNED,
87     }
88
89     enum ServiceState {
90         /**
91          * Local service is up and running.
92          */
93         // FIXME: we should support async startup, which will require a STARTING state.
94         STARTED,
95         /**
96          * Local service is being stopped.
97          */
98         STOPPING,
99     }
100
101     private static final Logger LOG = LoggerFactory.getLogger(ActiveServiceGroup.class);
102
103     private final @NonNull DOMEntityOwnershipService entityOwnershipService;
104     private final @NonNull String identifier;
105
106     /* Entity instances */
107     private final @NonNull DOMEntity serviceEntity;
108     private final @NonNull DOMEntity cleanupEntity;
109
110     private final Set<ServiceRegistration> members = ConcurrentHashMap.newKeySet();
111     // Guarded by lock
112     private final Map<ServiceRegistration, ServiceInfo> services = new HashMap<>();
113
114     // Marker for when any state changed
115     private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> DIRTY_UPDATER =
116             AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "dirty");
117     private volatile int dirty;
118
119     // Simplified lock: non-reentrant, support tryLock() only
120     private static final AtomicIntegerFieldUpdater<ActiveServiceGroup> LOCK_UPDATER =
121             AtomicIntegerFieldUpdater.newUpdater(ActiveServiceGroup.class, "lock");
122     @SuppressWarnings("unused")
123     private volatile int lock;
124
125     /*
126      * State tracking is quite involved, as we are tracking up to four asynchronous sources of events:
127      * - user calling close()
128      * - service entity ownership
129      * - cleanup entity ownership
130      * - service shutdown future
131      *
132      * Absolutely correct solution would be a set of behaviors, which govern each state, remembering where we want to
133      * get to and what we are doing. That would result in ~15 classes which would quickly render this code unreadable
134      * due to boilerplate overhead.
135      *
136      * We therefore take a different approach, tracking state directly in this class and evaluate state transitions
137      * based on recorded bits -- without explicit representation of state machine state.
138      */
139     /**
140      * Group close future. In can only go from null to non-null reference. Whenever it is non-null, it indicates that
141      * the user has closed the group and we are converging to termination.
142      */
143     // We are using volatile get-and-set to support non-blocking close(). It may be more efficient to inline it here,
144     // as we perform a volatile read after unlocking -- that volatile read may easier on L1 cache.
145     // XXX: above needs a microbenchmark contention ever becomes a problem.
146     private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
147
148     /**
149      * Service (base) entity registration. This entity selects an owner candidate across nodes. Candidates proceed to
150      * acquire {@link #cleanupEntity}.
151      */
152     @GuardedBy("this")
153     private Registration serviceEntityReg = null;
154     /**
155      * Service (base) entity last reported state.
156      */
157     @GuardedBy("this")
158     private EntityState serviceEntityState = EntityState.UNREGISTERED;
159
160     /**
161      * Cleanup (owner) entity registration. This entity guards access to service state and coordinates shutdown cleanup
162      * and startup.
163      */
164     @GuardedBy("this")
165     private Registration cleanupEntityReg;
166     /**
167      * Cleanup (owner) entity last reported state.
168      */
169     @GuardedBy("this")
170     private EntityState cleanupEntityState = EntityState.UNREGISTERED;
171
172     private volatile boolean initialized;
173
174     /**
175      * Class constructor. Note: last argument is reused as-is.
176      *
177      * @param identifier non-empty string as identifier
178      * @param serviceEntity as Entity instance
179      * @param cleanupEntity as Entity instance
180      * @param entityOwnershipService GenericEntityOwnershipService instance
181      * @param parent parent service
182      * @param services Services list
183      */
184     ActiveServiceGroup(final String identifier, final DOMEntityOwnershipService entityOwnershipService,
185             final DOMEntity serviceEntity, final DOMEntity cleanupEntity, final List<ServiceRegistration> services) {
186         checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
187         this.identifier = identifier;
188         this.entityOwnershipService = requireNonNull(entityOwnershipService);
189         this.serviceEntity = requireNonNull(serviceEntity);
190         this.cleanupEntity = requireNonNull(cleanupEntity);
191         members.addAll(services);
192
193         LOG.debug("Instantiated new service group for {}", identifier);
194     }
195
196     @VisibleForTesting
197     ActiveServiceGroup(final String identifier, final DOMEntity serviceEntity,
198             final DOMEntity cleanupEntity, final DOMEntityOwnershipService entityOwnershipService) {
199         this(identifier, entityOwnershipService, serviceEntity, cleanupEntity, ImmutableList.of());
200     }
201
202     @Override
203     public String getIdentifier() {
204         return identifier;
205     }
206
207     @Override
208     ListenableFuture<?> closeClusterSingletonGroup() {
209         final var ret = destroyGroup();
210         members.clear();
211         markDirty();
212
213         if (tryLock()) {
214             reconcileState();
215         } else {
216             LOG.debug("Service group {} postponing sync on close", identifier);
217         }
218
219         return ret;
220     }
221
222     private boolean isClosed() {
223         return closeFuture.get() != null;
224     }
225
226     @Override
227     void initialize() throws CandidateAlreadyRegisteredException {
228         verify(tryLock());
229         try {
230             checkState(!initialized, "Singleton group %s was already initilized", identifier);
231             LOG.debug("Initializing service group {} with services {}", identifier, members);
232             synchronized (this) {
233                 serviceEntityState = EntityState.REGISTERED;
234                 serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
235                 initialized = true;
236             }
237         } finally {
238             unlock();
239         }
240     }
241
242     private void checkNotClosed() {
243         checkState(!isClosed(), "Service group %s has already been closed", identifier);
244     }
245
246     @Override
247     void registerService(final ServiceRegistration reg) {
248         final var service = verifyRegistration(reg);
249         checkNotClosed();
250
251         checkState(initialized, "Service group %s is not initialized yet", identifier);
252
253         // First put the service
254         LOG.debug("Adding service {} to service group {}", service, identifier);
255         verify(members.add(reg));
256         markDirty();
257
258         if (!tryLock()) {
259             LOG.debug("Service group {} delayed register of {}", identifier, reg);
260             return;
261         }
262
263         reconcileState();
264     }
265
266     @Override
267     ListenableFuture<?> unregisterService(final ServiceRegistration reg) {
268         verifyRegistration(reg);
269         checkNotClosed();
270
271         verify(members.remove(reg));
272         markDirty();
273         if (members.isEmpty()) {
274             // We need to let AbstractClusterSingletonServiceProviderImpl know this group is to be shutdown
275             // before we start applying state, because while we do not re-enter, the user is free to do whatever,
276             // notably including registering a service with the same ID from the service shutdown hook. That
277             // registration request needs to hit the successor of this group.
278             return destroyGroup();
279         }
280
281         if (tryLock()) {
282             reconcileState();
283         } else {
284             LOG.debug("Service group {} delayed unregister of {}", identifier, reg);
285         }
286         return null;
287     }
288
289     private ClusterSingletonService verifyRegistration(final ServiceRegistration reg) {
290         final var service = reg.getInstance();
291         verify(identifier.equals(service.getIdentifier().value()));
292         return service;
293     }
294
295     private synchronized @NonNull ListenableFuture<?> destroyGroup() {
296         final var future = SettableFuture.<Void>create();
297         final var witness = closeFuture.compareAndExchange(null, future);
298         if (witness != null) {
299             return witness;
300         }
301
302         if (serviceEntityReg != null) {
303             // We are still holding the service registration, close it now...
304             LOG.debug("Service group {} unregistering service entity {}", identifier, serviceEntity);
305             serviceEntityReg.close();
306             serviceEntityReg = null;
307         }
308
309         markDirty();
310         return future;
311     }
312
313     @Override
314     void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change, final boolean inJeopardy) {
315         synchronized (this) {
316             lockedOwnershipChanged(entity, change, inJeopardy);
317         }
318
319         if (isDirty()) {
320             if (!tryLock()) {
321                 LOG.debug("Service group {} postponing ownership change sync", identifier);
322                 return;
323             }
324
325             reconcileState();
326         }
327     }
328
329     /**
330      * Handle an ownership change with the lock held. Callers are expected to handle termination conditions, this method
331      * and anything it calls must not call {@link #lockedClose(SettableFuture)}.
332      *
333      * @param ownershipChange reported change
334      */
335     @Holding("this")
336     private void lockedOwnershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
337             final boolean inJeopardy) {
338         if (serviceEntity.equals(entity)) {
339             serviceOwnershipChanged(change, inJeopardy);
340             markDirty();
341         } else if (cleanupEntity.equals(entity)) {
342             cleanupCandidateOwnershipChanged(change, inJeopardy);
343             markDirty();
344         } else {
345             LOG.warn("Group {} received unrecognized entity {}", identifier, entity);
346         }
347     }
348
349     @Holding("this")
350     private void cleanupCandidateOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
351         if (jeopardy) {
352             cleanupEntityState = switch (state) {
353                 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> {
354                     LOG.warn("Service group {} cleanup entity owned without certainty", identifier);
355                     yield EntityState.OWNED_JEOPARDY;
356                 }
357                 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
358                      REMOTE_OWNERSHIP_LOST_NO_OWNER -> {
359                     LOG.info("Service group {} cleanup entity ownership uncertain", identifier);
360                     yield EntityState.UNOWNED;
361                 }
362             };
363             return;
364         }
365
366         if (cleanupEntityState == EntityState.OWNED_JEOPARDY) {
367             // Pair info message with previous jeopardy
368             LOG.info("Service group {} cleanup entity ownership ascertained", identifier);
369         }
370
371         cleanupEntityState = switch (state) {
372             case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED;
373             case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_LOST_NO_OWNER,
374                  REMOTE_OWNERSHIP_CHANGED -> EntityState.UNOWNED;
375         };
376     }
377
378     @Holding("this")
379     private void serviceOwnershipChanged(final EntityOwnershipStateChange state, final boolean jeopardy) {
380         if (jeopardy) {
381             LOG.info("Service group {} service entity ownership uncertain", identifier);
382             serviceEntityState = switch (state) {
383                 case LOCAL_OWNERSHIP_GRANTED, LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE -> EntityState.OWNED_JEOPARDY;
384                 case LOCAL_OWNERSHIP_LOST_NEW_OWNER, LOCAL_OWNERSHIP_LOST_NO_OWNER, REMOTE_OWNERSHIP_CHANGED,
385                      REMOTE_OWNERSHIP_LOST_NO_OWNER -> EntityState.UNOWNED;
386             };
387             return;
388         }
389
390         if (serviceEntityState == EntityState.OWNED_JEOPARDY) {
391             // Pair info message with previous jeopardy
392             LOG.info("Service group {} service entity ownership ascertained", identifier);
393         }
394
395         switch (state) {
396             case LOCAL_OWNERSHIP_GRANTED:
397             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
398                 LOG.debug("Service group {} acquired service entity ownership", identifier);
399                 serviceEntityState = EntityState.OWNED;
400                 break;
401             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
402             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
403             case REMOTE_OWNERSHIP_CHANGED:
404             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
405                 LOG.debug("Service group {} lost service entity ownership", identifier);
406                 serviceEntityState = EntityState.UNOWNED;
407                 break;
408             default:
409                 LOG.warn("Service group {} ignoring unhandled cleanup entity change {}", identifier, state);
410         }
411     }
412
413     // has to be called with lock asserted, which will be released prior to returning
414     private void reconcileState() {
415         // Always check if there is any state change to be applied.
416         while (true) {
417             try {
418                 if (conditionalClean()) {
419                     tryReconcileState();
420                 }
421             } finally {
422                 // We may have ran a round of reconciliation, but the either one of may have happened asynchronously:
423                 // - registration
424                 // - unregistration
425                 // - service future completed
426                 // - entity state changed
427                 //
428                 // We are dropping the lock, but we need to recheck dirty and try to apply state again if it is found to
429                 // be dirty again. This closes the following race condition:
430                 //
431                 // A: runs these checks holding the lock
432                 // B: modifies them, fails to acquire lock
433                 // A: releases lock -> noone takes care of reconciliation
434
435                 unlock();
436             }
437
438             if (isDirty()) {
439                 if (tryLock()) {
440                     LOG.debug("Service group {} re-running reconciliation", identifier);
441                     continue;
442                 }
443
444                 LOG.debug("Service group {} will be reconciled by someone else", identifier);
445             } else {
446                 LOG.debug("Service group {} is completely reconciled", identifier);
447             }
448
449             break;
450         }
451     }
452
453     private void serviceTransitionCompleted() {
454         markDirty();
455         if (tryLock()) {
456             reconcileState();
457         }
458     }
459
460     // Has to be called with lock asserted
461     private void tryReconcileState() {
462         // First take a safe snapshot of current state on which we will base our decisions.
463         final Set<ServiceRegistration> localMembers;
464         final boolean haveCleanup;
465         final boolean haveService;
466         synchronized (this) {
467             if (serviceEntityReg != null) {
468                 haveService = switch (serviceEntityState) {
469                     case OWNED, OWNED_JEOPARDY -> true;
470                     case REGISTERED, UNOWNED, UNREGISTERED -> false;
471                 };
472             } else {
473                 haveService = false;
474             }
475
476             if (haveService && cleanupEntityReg == null) {
477                 // We have the service entity but have not registered for cleanup entity. Do that now and retry.
478                 LOG.debug("Service group {} registering cleanup entity", identifier);
479                 try {
480                     cleanupEntityState = EntityState.REGISTERED;
481                     cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
482                 } catch (CandidateAlreadyRegisteredException e) {
483                     LOG.error("Service group {} failed to take ownership, aborting", identifier, e);
484                     if (serviceEntityReg != null) {
485                         serviceEntityReg.close();
486                         serviceEntityReg = null;
487                     }
488                 }
489                 markDirty();
490                 return;
491             }
492
493             if (cleanupEntityReg != null) {
494                 haveCleanup = switch (cleanupEntityState) {
495                     case OWNED -> true;
496                     case OWNED_JEOPARDY, REGISTERED, UNOWNED, UNREGISTERED -> false;
497                 };
498             } else {
499                 haveCleanup = false;
500             }
501
502             localMembers = ImmutableSet.copyOf(members);
503         }
504
505         if (haveService && haveCleanup) {
506             ensureServicesStarting(localMembers);
507             return;
508         }
509
510         ensureServicesStopping();
511
512         if (!haveService && services.isEmpty()) {
513             LOG.debug("Service group {} has no running services", identifier);
514             final boolean canFinishClose;
515             synchronized (this) {
516                 if (cleanupEntityReg != null) {
517                     LOG.debug("Service group {} releasing cleanup entity", identifier);
518                     cleanupEntityReg.close();
519                     cleanupEntityReg = null;
520                 }
521
522                 canFinishClose = switch (cleanupEntityState) {
523                     case OWNED, OWNED_JEOPARDY, REGISTERED -> false;
524                     case UNOWNED, UNREGISTERED -> true;
525                 };
526             }
527
528             if (canFinishClose) {
529                 final SettableFuture<Void> localFuture = closeFuture.get();
530                 if (localFuture != null && !localFuture.isDone()) {
531                     LOG.debug("Service group {} completing termination", identifier);
532                     localFuture.set(null);
533                 }
534             }
535         }
536     }
537
538     // Has to be called with lock asserted
539     @SuppressWarnings("illegalCatch")
540     private void ensureServicesStarting(final Set<ServiceRegistration> localConfig) {
541         LOG.debug("Service group {} starting services", identifier);
542
543         // This may look counter-intuitive, but the localConfig may be missing some services that are started -- for
544         // example when this method is executed as part of unregisterService() call. In that case we need to ensure
545         // services in the list are stopping
546         final var it = services.entrySet().iterator();
547         while (it.hasNext()) {
548             final var entry = it.next();
549             final var reg = entry.getKey();
550             if (!localConfig.contains(reg)) {
551                 final var newInfo = ensureStopping(reg, entry.getValue());
552                 if (newInfo != null) {
553                     entry.setValue(newInfo);
554                 } else {
555                     it.remove();
556                 }
557             }
558         }
559
560         // Now make sure member services are being juggled around
561         for (var reg : localConfig) {
562             if (!services.containsKey(reg)) {
563                 final var service = reg.getInstance();
564                 LOG.debug("Starting service {}", service);
565
566                 try {
567                     service.instantiateServiceInstance();
568                 } catch (Exception e) {
569                     LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service,
570                         e);
571                     continue;
572                 }
573
574                 services.put(reg, ServiceInfo.STARTED);
575             }
576         }
577     }
578
579     // Has to be called with lock asserted
580     private void ensureServicesStopping() {
581         final var it = services.entrySet().iterator();
582         while (it.hasNext()) {
583             final var entry = it.next();
584             final var newInfo = ensureStopping(entry.getKey(), entry.getValue());
585             if (newInfo != null) {
586                 entry.setValue(newInfo);
587             } else {
588                 it.remove();
589             }
590         }
591     }
592
593     @SuppressWarnings("illegalCatch")
594     private ServiceInfo ensureStopping(final ServiceRegistration reg, final ServiceInfo info) {
595         switch (info.getState()) {
596             case STARTED:
597                 final var service = reg.getInstance();
598
599                 LOG.debug("Service group {} stopping service {}", identifier, service);
600                 final @NonNull ListenableFuture<?> future;
601                 try {
602                     future = verifyNotNull(service.closeServiceInstance());
603                 } catch (Exception e) {
604                     LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier, service,
605                         e);
606                     return null;
607                 }
608
609                 Futures.addCallback(future, new FutureCallback<Object>() {
610                     @Override
611                     public void onSuccess(final Object result) {
612                         LOG.debug("Service group {} service {} stopped successfully", identifier, service);
613                         serviceTransitionCompleted();
614                     }
615
616                     @Override
617                     public void onFailure(final Throwable cause) {
618                         LOG.debug("Service group {} service {} stopped with error", identifier, service, cause);
619                         serviceTransitionCompleted();
620                     }
621                 }, MoreExecutors.directExecutor());
622                 return info.toState(ServiceState.STOPPING, future);
623             case STOPPING:
624                 if (info.getFuture().isDone()) {
625                     LOG.debug("Service group {} removed stopped service {}", identifier, reg.getInstance());
626                     return null;
627                 }
628                 return info;
629             default:
630                 throw new IllegalStateException("Unhandled state " + info.getState());
631         }
632     }
633
634     private void markDirty() {
635         dirty = 1;
636     }
637
638     private boolean isDirty() {
639         return dirty != 0;
640     }
641
642     private boolean conditionalClean() {
643         return DIRTY_UPDATER.compareAndSet(this, 1, 0);
644     }
645
646     private boolean tryLock() {
647         return LOCK_UPDATER.compareAndSet(this, 0, 1);
648     }
649
650     private boolean unlock() {
651         verify(LOCK_UPDATER.compareAndSet(this, 1, 0));
652         return true;
653     }
654
655     @Override
656     public String toString() {
657         return MoreObjects.toStringHelper(this).add("identifier", identifier).toString();
658     }
659 }