279129adc012d60ff9c4ce28dfc914653338308d
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.CheckReturnValue;
25 import javax.annotation.concurrent.GuardedBy;
26 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
27 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
33 import org.opendaylight.yangtools.concepts.Path;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
39  * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
40  *
41  * <p>
42  * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
43  * a result on new candidates appearing. We use two entities:
44  * - service entity, to which all nodes register
45  * - cleanup entity, which only the service entity owner registers to
46  *
47  * <p>
48  * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
49  * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
50  * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
51  * granted until the old owner finishes the cleanup.
52  *
53  * @param <P> the instance identifier path type
54  * @param <E> the GenericEntity type
55  * @param <C> the GenericEntityOwnershipChange type
56  * @param <G> the GenericEntityOwnershipListener type
57  * @param <S> the GenericEntityOwnershipService type
58  */
59 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
60         C extends GenericEntityOwnershipChange<P, E>,  G extends GenericEntityOwnershipListener<P, C>,
61         S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
62     private enum State {
63         /**
64          * This group has been freshly allocated and has not been started yet.
65          */
66         INITIAL,
67         /**
68          * Operational state. Service entity is registered, but ownership was not resolved yet.
69          */
70         REGISTERED,
71         /**
72          * Operational state. Service entity confirmed to be follower.
73          */
74         STANDBY,
75         /**
76          * Service entity acquired. Attempting to acquire cleanup entity.
77          */
78         TAKING_OWNERSHIP,
79         /**
80          * Both entities held and user services are being started.
81          */
82         STARTING_SERVICES,
83         /**
84          * Steady state. Both entities held and services have finished starting.
85          */
86         OWNER,
87         /**
88          * User services are being stopped due to either loss of an entity or a shutdown.
89          */
90         STOPPING_SERVICES,
91         /**
92          * We have stopped services and are now relinquishing the cleanup entity.
93          */
94         RELEASING_OWNERSHIP,
95         /**
96          * Terminated, this group cannot be used anymore.
97          */
98         TERMINATED
99     }
100
101     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
102
103     private final S entityOwnershipService;
104     private final String identifier;
105
106     /* Entity instances */
107     private final E serviceEntity;
108     private final E cleanupEntity;
109
110     private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
111     private final ReentrantLock lock = new ReentrantLock(true);
112
113     @GuardedBy("lock")
114     private final List<ClusterSingletonService> serviceGroup;
115
116     @GuardedBy("lock")
117     private State state = State.INITIAL;
118
119     @GuardedBy("lock")
120     private List<C> capture;
121
122     /* EOS Candidate Registrations */
123     @GuardedBy("lock")
124     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
125     @GuardedBy("lock")
126     private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
127
128     /**
129      * Class constructor. Note: last argument is reused as-is.
130      *
131      * @param identifier non-empty string as identifier
132      * @param mainEntity as Entity instance
133      * @param closeEntity as Entity instance
134      * @param entityOwnershipService GenericEntityOwnershipService instance
135      * @param parent parent service
136      * @param services Services list
137      */
138     ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
139             final E closeEntity, final List<ClusterSingletonService> services) {
140         Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
141         this.identifier = identifier;
142         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
143         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
144         this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
145         this.serviceGroup = Preconditions.checkNotNull(services);
146         LOG.debug("Instantiated new service group for {}", identifier);
147     }
148
149     @VisibleForTesting
150     ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
151             final E closeEntity, final S entityOwnershipService) {
152         this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
153     }
154
155     @Override
156     public String getIdentifier() {
157         return identifier;
158     }
159
160     @Override
161     ListenableFuture<?> closeClusterSingletonGroup() {
162         // Assert our future first
163         final SettableFuture<Void> future = SettableFuture.create();
164         final SettableFuture<Void> existing = closeFuture.getAndSet(future);
165         if (existing != null) {
166             return existing;
167         }
168
169         if (!lock.tryLock()) {
170             // The lock is held, the cleanup will be finished by the owner thread
171             LOG.debug("Singleton group {} cleanup postponed", identifier);
172             return future;
173         }
174
175         try {
176             lockedClose(future);
177         } finally {
178             lock.unlock();
179         }
180
181         LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
182         return future;
183     }
184
185     private boolean isClosed() {
186         return closeFuture.get() != null;
187     }
188
189     @GuardedBy("lock")
190     private void updateState(final State newState) {
191         LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
192         state = Verify.verifyNotNull(newState);
193     }
194
195     @GuardedBy("lock")
196     private void lockedClose(final SettableFuture<Void> future) {
197         if (serviceEntityReg != null) {
198             LOG.debug("Service group {} unregistering", identifier);
199             serviceEntityReg.close();
200             serviceEntityReg = null;
201         }
202
203         switch (state) {
204             case INITIAL:
205                 // Not started: not much to do
206                 terminate(future);
207                 break;
208             case TERMINATED:
209                 // Already done: no-op
210                 break;
211             case REGISTERED:
212             case STANDBY:
213                 LOG.debug("Service group {} terminated", identifier);
214                 terminate(future);
215                 break;
216             case OWNER:
217                 // No-op, we will react to the loss of registration instead.
218                 break;
219             case STOPPING_SERVICES:
220                 // Waiting for services. Will resume once we get notified.
221                 break;
222             case RELEASING_OWNERSHIP:
223                 // Waiting for cleanup entity to flip, will resume afterwards.
224                 break;
225             case TAKING_OWNERSHIP:
226                 // Abort taking of ownership and close
227                 LOG.debug("Service group {} aborting ownership bid", identifier);
228                 cleanupEntityReg.close();
229                 cleanupEntityReg = null;
230                 updateState(State.RELEASING_OWNERSHIP);
231                 break;
232             default:
233                 throw new IllegalStateException("Unhandled state " + state);
234         }
235     }
236
237     @GuardedBy("lock")
238     private void terminate(final SettableFuture<Void> future) {
239         updateState(State.TERMINATED);
240         Verify.verify(future.set(null));
241     }
242
243     @Override
244     void initialize() throws CandidateAlreadyRegisteredException {
245         LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
246
247         lock.lock();
248         try {
249             Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
250                     state);
251
252             // Catch events if they fire during this call
253             capture = new ArrayList<>(0);
254             serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
255             state = State.REGISTERED;
256
257             final List<C> captured = capture;
258             capture = null;
259             captured.forEach(this::lockedOwnershipChanged);
260         } finally {
261             lock.unlock();
262         }
263     }
264
265     private void checkNotClosed() {
266         Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
267                 identifier);
268     }
269
270     @Override
271     void registerService(final ClusterSingletonService service) {
272         Verify.verify(identifier.equals(service.getIdentifier().getValue()));
273         checkNotClosed();
274
275         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
276
277         lock.lock();
278         try {
279             Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
280             serviceGroup.add(service);
281
282             switch (state) {
283                 case OWNER:
284                 case STARTING_SERVICES:
285                     service.instantiateServiceInstance();
286                     break;
287                 default:
288                     break;
289             }
290         } finally {
291             lock.unlock();
292         }
293     }
294
295     @CheckReturnValue
296     @Override
297     boolean unregisterService(final ClusterSingletonService service) {
298         Verify.verify(identifier.equals(service.getIdentifier().getValue()));
299         checkNotClosed();
300
301         lock.lock();
302         try {
303             // There is a slight problem here, as the type does not match the list type, hence we need to tread
304             // carefully.
305             if (serviceGroup.size() == 1) {
306                 Verify.verify(serviceGroup.contains(service));
307                 return true;
308             }
309
310             Verify.verify(serviceGroup.remove(service));
311             LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
312
313             switch (state) {
314                 case OWNER:
315                 case STARTING_SERVICES:
316                     service.closeServiceInstance();
317                     break;
318                 default:
319                     break;
320             }
321
322             return false;
323         } finally {
324             lock.unlock();
325             finishCloseIfNeeded();
326         }
327     }
328
329     @Override
330     void ownershipChanged(final C ownershipChange) {
331         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
332
333         lock.lock();
334         try {
335             if (capture != null) {
336                 capture.add(ownershipChange);
337             } else {
338                 lockedOwnershipChanged(ownershipChange);
339             }
340         } finally {
341             lock.unlock();
342             finishCloseIfNeeded();
343         }
344     }
345
346     private void lockedOwnershipChanged(final C ownershipChange) {
347         if (ownershipChange.inJeopardy()) {
348             LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
349             lostOwnership();
350             return;
351         }
352
353         final E entity = ownershipChange.getEntity();
354         if (serviceEntity.equals(entity)) {
355             serviceOwnershipChanged(ownershipChange);
356         } else if (cleanupEntity.equals(entity)) {
357             cleanupCandidateOwnershipChanged(ownershipChange);
358         } else {
359             LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
360         }
361     }
362
363     private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
364         switch (ownershipChange.getState()) {
365             case LOCAL_OWNERSHIP_GRANTED:
366                 switch (state) {
367                     case TAKING_OWNERSHIP:
368                         // SLAVE to MASTER
369                         startServices();
370                         return;
371                     default:
372                         break;
373                 }
374                 break;
375             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
376             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
377                 switch (state) {
378                     case RELEASING_OWNERSHIP:
379                         // Slight cheat: if we are closing down, we just need to notify the future
380                         updateState(isClosed() ? State.INITIAL : State.STANDBY);
381                         return;
382                     case STARTING_SERVICES:
383                     case OWNER:
384                     case TAKING_OWNERSHIP:
385                         LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
386                         return;
387                     default:
388                         break;
389                 }
390
391                 break;
392             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
393             case REMOTE_OWNERSHIP_CHANGED:
394             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
395             default:
396                 break;
397         }
398
399         LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
400     }
401
402     private void serviceOwnershipChanged(final C ownershipChange) {
403         switch (ownershipChange.getState()) {
404             case LOCAL_OWNERSHIP_GRANTED:
405                 // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
406                 takeOwnership();
407                 break;
408             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
409             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
410                 // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
411                 lostOwnership();
412                 break;
413             default:
414                 // Not needed notifications
415                 LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
416                     ownershipChange);
417         }
418     }
419
420     private void finishCloseIfNeeded() {
421         final SettableFuture<Void> future = closeFuture.get();
422         if (future != null) {
423             lock.lock();
424             try {
425                 lockedClose(future);
426             } finally {
427                 lock.unlock();
428             }
429         }
430     }
431
432     /*
433      * Help method to registered DoubleCandidateEntity. It is first step
434      * before the actual instance take Leadership.
435      */
436     private void takeOwnership() {
437         if (isClosed()) {
438             LOG.debug("Service group {} is closed, not taking ownership", identifier);
439             return;
440         }
441
442         LOG.debug("Group {} taking ownership", identifier);
443
444         updateState(State.TAKING_OWNERSHIP);
445         try {
446             cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
447         } catch (CandidateAlreadyRegisteredException e) {
448             LOG.error("Service group {} failed to take ownership", identifier, e);
449         }
450     }
451
452     /*
453      * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
454      */
455     @SuppressWarnings("checkstyle:IllegalCatch")
456     private void startServices() {
457         if (isClosed()) {
458             LOG.debug("Service group {} is closed, not starting services", identifier);
459             return;
460         }
461
462         LOG.debug("Service group {} starting services", identifier);
463         serviceGroup.forEach(service -> {
464             LOG.debug("Starting service {}", service);
465             try {
466                 service.instantiateServiceInstance();
467             } catch (Exception e) {
468                 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
469             }
470         });
471
472         LOG.debug("Service group {} services started", identifier);
473         updateState(State.OWNER);
474     }
475
476     /*
477      * Help method calls suspendService method for stop this single cluster-wide service instance.
478      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
479      * new election for DoubleCandidateEntity.
480      */
481     private void lostOwnership() {
482         LOG.debug("Service group {} lost ownership in state {}", identifier, state);
483         switch (state) {
484             case REGISTERED:
485                 updateState(State.STANDBY);
486                 break;
487             case OWNER:
488                 stopServices();
489                 break;
490             case STARTING_SERVICES:
491             case STOPPING_SERVICES:
492                 // No-op, as these will re-check state before proceeding
493                 break;
494             case TAKING_OWNERSHIP:
495                 cleanupEntityReg.close();
496                 cleanupEntityReg = null;
497                 updateState(State.STANDBY);
498                 break;
499             case INITIAL:
500             case TERMINATED:
501             default:
502                 LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
503                 break;
504         }
505     }
506
507     @SuppressWarnings("checkstyle:IllegalCatch")
508     void stopServices() {
509         updateState(State.STOPPING_SERVICES);
510
511         final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
512         for (final ClusterSingletonService service : serviceGroup) {
513             final ListenableFuture<Void> future;
514
515             try {
516                 future = service.closeServiceInstance();
517             } catch (Exception e) {
518                 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
519                     service, e);
520                 continue;
521             }
522
523             serviceCloseFutureList.add(future);
524         }
525
526         Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
527             @Override
528             public void onFailure(final Throwable cause) {
529                 LOG.warn("Service group {} service stopping reported error", identifier, cause);
530                 onServicesStopped();
531             }
532
533             @Override
534             public void onSuccess(final List<Void> nulls) {
535                 onServicesStopped();
536             }
537         }, MoreExecutors.directExecutor());
538     }
539
540     void onServicesStopped() {
541         LOG.debug("Service group {} finished stopping services", identifier);
542         lock.lock();
543         try {
544             if (cleanupEntityReg != null) {
545                 updateState(State.RELEASING_OWNERSHIP);
546                 cleanupEntityReg.close();
547                 cleanupEntityReg = null;
548             } else {
549                 updateState(State.STANDBY);
550             }
551         } finally {
552             lock.unlock();
553             finishCloseIfNeeded();
554         }
555     }
556
557     @Override
558     public String toString() {
559         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
560     }
561 }