82897e6c60366571ee430dd1e675b93194ca9fa8
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.annotation.CheckReturnValue;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
26 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
27 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
31 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
32 import org.opendaylight.yangtools.concepts.Path;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Implementation of {@link ClusterSingletonServiceGroup} on top of the Entitiy Ownership Service. Since EOS is atomic
38  * in its operation and singleton services incur startup and most notably cleanup, we need to do something smart here.
39  *
40  * <p>
41  * The implementation takes advantage of the fact that EOS provides stable ownership, i.e. owners are not moved as
42  * a result on new candidates appearing. We use two entities:
43  * - service entity, to which all nodes register
44  * - cleanup entity, which only the service entity owner registers to
45  *
46  * <p>
47  * Once the cleanup entity ownership is acquired, services are started. As long as the cleanup entity is registered,
48  * it should remain the owner. In case a new service owner emerges, the old owner will start the cleanup process,
49  * eventually releasing the cleanup entity. The new owner registers for the cleanup entity -- but will not see it
50  * granted until the old owner finishes the cleanup.
51  *
52  * @param <P> the instance identifier path type
53  * @param <E> the GenericEntity type
54  * @param <C> the GenericEntityOwnershipChange type
55  * @param <G> the GenericEntityOwnershipListener type
56  * @param <S> the GenericEntityOwnershipService type
57  */
58 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
59         C extends GenericEntityOwnershipChange<P, E>,  G extends GenericEntityOwnershipListener<P, C>,
60         S extends GenericEntityOwnershipService<P, E, G>> extends ClusterSingletonServiceGroup<P, E, C> {
61     private enum State {
62         /**
63          * This group has been freshly allocated and has not been started yet.
64          */
65         INITIAL,
66         /**
67          * Operational state. Service entity is registered, but ownership was not resolved yet.
68          */
69         REGISTERED,
70         /**
71          * Operational state. Service entity confirmed to be follower.
72          */
73         STANDBY,
74         /**
75          * Service entity acquired. Attempting to acquire cleanup entity.
76          */
77         TAKING_OWNERSHIP,
78         /**
79          * Both entities held and user services are being started.
80          */
81         STARTING_SERVICES,
82         /**
83          * Steady state. Both entities held and services have finished starting.
84          */
85         OWNER,
86         /**
87          * User services are being stopped due to either loss of an entity or a shutdown.
88          */
89         STOPPING_SERVICES,
90         /**
91          * We have stopped services and are now relinquishing the cleanup entity.
92          */
93         RELEASING_OWNERSHIP,
94         /**
95          * Terminated, this group cannot be used anymore.
96          */
97         TERMINATED
98     }
99
100     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class);
101
102     private final S entityOwnershipService;
103     private final String identifier;
104
105     /* Entity instances */
106     private final E serviceEntity;
107     private final E cleanupEntity;
108
109     private final AtomicReference<SettableFuture<Void>> closeFuture = new AtomicReference<>();
110     private final ReentrantLock lock = new ReentrantLock(true);
111
112     @GuardedBy("lock")
113     private final List<ClusterSingletonService> serviceGroup;
114
115     @GuardedBy("lock")
116     private State state = State.INITIAL;
117
118     @GuardedBy("lock")
119     private List<C> capture;
120
121     /* EOS Candidate Registrations */
122     @GuardedBy("lock")
123     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityReg;
124     @GuardedBy("lock")
125     private GenericEntityOwnershipCandidateRegistration<P, E> cleanupEntityReg;
126
127     /**
128      * Class constructor. Note: last argument is reused as-is.
129      *
130      * @param identifier non-empty string as identifier
131      * @param mainEntity as Entity instance
132      * @param closeEntity as Entity instance
133      * @param entityOwnershipService GenericEntityOwnershipService instance
134      * @param parent parent service
135      * @param services Services list
136      */
137     ClusterSingletonServiceGroupImpl(final String identifier, final S entityOwnershipService, final E mainEntity,
138             final E closeEntity, final List<ClusterSingletonService> services) {
139         Preconditions.checkArgument(!identifier.isEmpty(), "Identifier may not be empty");
140         this.identifier = identifier;
141         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
142         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
143         this.cleanupEntity = Preconditions.checkNotNull(closeEntity);
144         this.serviceGroup = Preconditions.checkNotNull(services);
145         LOG.debug("Instantiated new service group for {}", identifier);
146     }
147
148     @VisibleForTesting
149     ClusterSingletonServiceGroupImpl(final String identifier, final E mainEntity,
150             final E closeEntity, final S entityOwnershipService) {
151         this(identifier, entityOwnershipService, mainEntity, closeEntity, new ArrayList<>(1));
152     }
153
154     @Override
155     public String getIdentifier() {
156         return identifier;
157     }
158
159     @Override
160     ListenableFuture<?> closeClusterSingletonGroup() {
161         // Assert our future first
162         final SettableFuture<Void> future = SettableFuture.create();
163         final SettableFuture<Void> existing = closeFuture.getAndSet(future);
164         if (existing != null) {
165             return existing;
166         }
167
168         if (!lock.tryLock()) {
169             // The lock is held, the cleanup will be finished by the owner thread
170             LOG.debug("Singleton group {} cleanup postponed", identifier);
171             return future;
172         }
173
174         try {
175             lockedClose(future);
176         } finally {
177             lock.unlock();
178         }
179
180         LOG.debug("Service group {} {}", identifier, future.isDone() ? "closed" : "closing");
181         return future;
182     }
183
184     private boolean isClosed() {
185         return closeFuture.get() != null;
186     }
187
188     @GuardedBy("lock")
189     private void updateState(final State newState) {
190         LOG.debug("Service group {} switching from {} to {}", identifier, state, newState);
191         state = Verify.verifyNotNull(newState);
192     }
193
194     @GuardedBy("lock")
195     private void lockedClose(final SettableFuture<Void> future) {
196         if (serviceEntityReg != null) {
197             LOG.debug("Service group {} unregistering", identifier);
198             serviceEntityReg.close();
199             serviceEntityReg = null;
200         }
201
202         switch (state) {
203             case INITIAL:
204                 // Not started: not much to do
205                 terminate(future);
206                 break;
207             case TERMINATED:
208                 // Already done: no-op
209                 break;
210             case REGISTERED:
211             case STANDBY:
212                 LOG.debug("Service group {} terminated", identifier);
213                 terminate(future);
214                 break;
215             case OWNER:
216                 // No-op, we will react to the loss of registration instead.
217                 break;
218             case STOPPING_SERVICES:
219                 // Waiting for services. Will resume once we get notified.
220                 break;
221             case RELEASING_OWNERSHIP:
222                 // Waiting for cleanup entity to flip, will resume afterwards.
223                 break;
224             case TAKING_OWNERSHIP:
225                 // Abort taking of ownership and close
226                 LOG.debug("Service group {} aborting ownership bid", identifier);
227                 cleanupEntityReg.close();
228                 cleanupEntityReg = null;
229                 updateState(State.RELEASING_OWNERSHIP);
230                 break;
231             default:
232                 throw new IllegalStateException("Unhandled state " + state);
233         }
234     }
235
236     @GuardedBy("lock")
237     private void terminate(final SettableFuture<Void> future) {
238         updateState(State.TERMINATED);
239         Verify.verify(future.set(null));
240     }
241
242     @Override
243     void initialize() throws CandidateAlreadyRegisteredException {
244         LOG.debug("Initialization ClusterSingletonGroup {}", identifier);
245
246         lock.lock();
247         try {
248             Preconditions.checkState(state == State.INITIAL, "Unexpected singleton group %s state %s", identifier,
249                     state);
250
251             // Catch events if they fire during this call
252             capture = new ArrayList<>(0);
253             serviceEntityReg = entityOwnershipService.registerCandidate(serviceEntity);
254             state = State.REGISTERED;
255
256             final List<C> captured = capture;
257             capture = null;
258             captured.forEach(this::lockedOwnershipChanged);
259         } finally {
260             lock.unlock();
261         }
262     }
263
264     private void checkNotClosed() {
265         Preconditions.checkState(closeFuture.get() == null, "Service group %s has already been closed",
266                 identifier);
267     }
268
269     @Override
270     void registerService(final ClusterSingletonService service) {
271         Verify.verify(identifier.equals(service.getIdentifier().getValue()));
272         checkNotClosed();
273
274         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", identifier);
275
276         lock.lock();
277         try {
278             Preconditions.checkState(state != State.INITIAL, "Service group %s is not initialized yet", identifier);
279             serviceGroup.add(service);
280
281             switch (state) {
282                 case OWNER:
283                 case STARTING_SERVICES:
284                     service.instantiateServiceInstance();
285                     break;
286                 default:
287                     break;
288             }
289         } finally {
290             lock.unlock();
291         }
292     }
293
294     @CheckReturnValue
295     @Override
296     boolean unregisterService(final ClusterSingletonService service) {
297         Verify.verify(identifier.equals(service.getIdentifier().getValue()));
298         checkNotClosed();
299
300         lock.lock();
301         try {
302             // There is a slight problem here, as the type does not match the list type, hence we need to tread
303             // carefully.
304             if (serviceGroup.size() == 1) {
305                 Verify.verify(serviceGroup.contains(service));
306                 return true;
307             }
308
309             Verify.verify(serviceGroup.remove(service));
310             LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
311
312             switch (state) {
313                 case OWNER:
314                 case STARTING_SERVICES:
315                     service.closeServiceInstance();
316                     break;
317                 default:
318                     break;
319             }
320
321             return false;
322         } finally {
323             lock.unlock();
324             finishCloseIfNeeded();
325         }
326     }
327
328     @Override
329     void ownershipChanged(final C ownershipChange) {
330         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, identifier);
331
332         lock.lock();
333         try {
334             if (capture != null) {
335                 capture.add(ownershipChange);
336             } else {
337                 lockedOwnershipChanged(ownershipChange);
338             }
339         } finally {
340             lock.unlock();
341             finishCloseIfNeeded();
342         }
343     }
344
345     private void lockedOwnershipChanged(final C ownershipChange) {
346         if (ownershipChange.inJeopardy()) {
347             LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
348             lostOwnership();
349             return;
350         }
351
352         final E entity = ownershipChange.getEntity();
353         if (serviceEntity.equals(entity)) {
354             serviceOwnershipChanged(ownershipChange);
355         } else if (cleanupEntity.equals(entity)) {
356             cleanupCandidateOwnershipChanged(ownershipChange);
357         } else {
358             LOG.warn("Group {} received unrecognized change {}", identifier, ownershipChange);
359         }
360     }
361
362     private void cleanupCandidateOwnershipChanged(final C ownershipChange) {
363         switch (ownershipChange.getState()) {
364             case LOCAL_OWNERSHIP_GRANTED:
365                 switch (state) {
366                     case TAKING_OWNERSHIP:
367                         // SLAVE to MASTER
368                         startServices();
369                         return;
370                     default:
371                         break;
372                 }
373                 break;
374             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
375             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
376                 switch (state) {
377                     case RELEASING_OWNERSHIP:
378                         // Slight cheat: if we are closing down, we just need to notify the future
379                         updateState(isClosed() ? State.INITIAL : State.STANDBY);
380                         return;
381                     case STARTING_SERVICES:
382                     case OWNER:
383                     case TAKING_OWNERSHIP:
384                         LOG.warn("Group {} lost cleanup ownership in state {}", identifier, state);
385                         return;
386                     default:
387                         break;
388                 }
389
390                 break;
391             case LOCAL_OWNERSHIP_RETAINED_WITH_NO_CHANGE:
392             case REMOTE_OWNERSHIP_CHANGED:
393             case REMOTE_OWNERSHIP_LOST_NO_OWNER:
394             default:
395                 break;
396         }
397
398         LOG.debug("Group {} in state {} ignoring cleanup OwnershipChange {}", identifier, state, ownershipChange);
399     }
400
401     private void serviceOwnershipChanged(final C ownershipChange) {
402         switch (ownershipChange.getState()) {
403             case LOCAL_OWNERSHIP_GRANTED:
404                 // SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
405                 takeOwnership();
406                 break;
407             case LOCAL_OWNERSHIP_LOST_NEW_OWNER:
408             case LOCAL_OWNERSHIP_LOST_NO_OWNER:
409                 // MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
410                 lostOwnership();
411                 break;
412             default:
413                 // Not needed notifications
414                 LOG.debug("Group {} in state {} not processed entity OwnershipChange {}", identifier, state,
415                     ownershipChange);
416         }
417     }
418
419     private void finishCloseIfNeeded() {
420         final SettableFuture<Void> future = closeFuture.get();
421         if (future != null) {
422             lock.lock();
423             try {
424                 lockedClose(future);
425             } finally {
426                 lock.unlock();
427             }
428         }
429     }
430
431     /*
432      * Help method to registered DoubleCandidateEntity. It is first step
433      * before the actual instance take Leadership.
434      */
435     private void takeOwnership() {
436         if (isClosed()) {
437             LOG.debug("Service group {} is closed, not taking ownership", identifier);
438             return;
439         }
440
441         LOG.debug("Group {} taking ownership", identifier);
442
443         updateState(State.TAKING_OWNERSHIP);
444         try {
445             cleanupEntityReg = entityOwnershipService.registerCandidate(cleanupEntity);
446         } catch (CandidateAlreadyRegisteredException e) {
447             LOG.error("Service group {} failed to take ownership", identifier, e);
448         }
449     }
450
451     /*
452      * Help method calls instantiateServiceInstance method for create single cluster-wide service instance.
453      */
454     @SuppressWarnings("checkstyle:IllegalCatch")
455     private void startServices() {
456         if (isClosed()) {
457             LOG.debug("Service group {} is closed, not starting services", identifier);
458             return;
459         }
460
461         LOG.debug("Service group {} starting services", identifier);
462         serviceGroup.forEach(service -> {
463             LOG.debug("Starting service {}", service);
464             try {
465                 service.instantiateServiceInstance();
466             } catch (Exception e) {
467                 LOG.warn("Service group {} service {} failed to start, attempting to continue", identifier, service, e);
468             }
469         });
470
471         LOG.debug("Service group {} services started", identifier);
472         updateState(State.OWNER);
473     }
474
475     /*
476      * Help method calls suspendService method for stop this single cluster-wide service instance.
477      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
478      * new election for DoubleCandidateEntity.
479      */
480     private void lostOwnership() {
481         LOG.debug("Service group {} lost ownership in state {}", identifier, state);
482         switch (state) {
483             case REGISTERED:
484                 updateState(State.STANDBY);
485                 break;
486             case OWNER:
487                 stopServices();
488                 break;
489             case STARTING_SERVICES:
490             case STOPPING_SERVICES:
491                 // No-op, as these will re-check state before proceeding
492                 break;
493             case TAKING_OWNERSHIP:
494                 cleanupEntityReg.close();
495                 cleanupEntityReg = null;
496                 updateState(State.STANDBY);
497                 break;
498             case INITIAL:
499             case TERMINATED:
500             default:
501                 LOG.info("Service group {} ignoring lost ownership in state {},", identifier, state);
502                 break;
503         }
504     }
505
506     @SuppressWarnings("checkstyle:IllegalCatch")
507     void stopServices() {
508         updateState(State.STOPPING_SERVICES);
509
510         final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>(serviceGroup.size());
511         for (final ClusterSingletonService service : serviceGroup) {
512             final ListenableFuture<Void> future;
513
514             try {
515                 future = service.closeServiceInstance();
516             } catch (Exception e) {
517                 LOG.warn("Service group {} service {} failed to stop, attempting to continue", identifier,
518                     service, e);
519                 continue;
520             }
521
522             serviceCloseFutureList.add(future);
523         }
524
525         Futures.addCallback(Futures.allAsList(serviceCloseFutureList), new FutureCallback<List<Void>>() {
526             @Override
527             public void onFailure(final Throwable cause) {
528                 LOG.warn("Service group {} service stopping reported error", identifier, cause);
529                 onServicesStopped();
530             }
531
532             @Override
533             public void onSuccess(final List<Void> nulls) {
534                 onServicesStopped();
535             }
536         });
537     }
538
539     void onServicesStopped() {
540         LOG.debug("Service group {} finished stopping services", identifier);
541         lock.lock();
542         try {
543             if (cleanupEntityReg != null) {
544                 updateState(State.RELEASING_OWNERSHIP);
545                 cleanupEntityReg.close();
546                 cleanupEntityReg = null;
547             } else {
548                 updateState(State.STANDBY);
549             }
550         } finally {
551             lock.unlock();
552             finishCloseIfNeeded();
553         }
554     }
555
556     @Override
557     public String toString() {
558         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("state", state).toString();
559     }
560 }