BUG-7222: Improve ClusterSingletonService error handling.
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.yangtools.concepts.Path;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Implementation of {@link ClusterSingletonServiceGroup}
42  *
43  * @param <P> the instance identifier path type
44  * @param <E> the GenericEntity type
45  * @param <C> the GenericEntityOwnershipChange type
46  * @param <G> the GenericEntityOwnershipListener type
47  * @param <S> the GenericEntityOwnershipService type
48  */
49 @VisibleForTesting
50 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
51                                              C extends GenericEntityOwnershipChange<P, E>,
52                                              G extends GenericEntityOwnershipListener<P, C>,
53                                              S extends GenericEntityOwnershipService<P, E, G>>
54         implements ClusterSingletonServiceGroup<P, E, C> {
55
56     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
57
58     private final S entityOwnershipService;
59     private final String clusterSingletonGroupIdentifier;
60     private final Semaphore clusterLock = new Semaphore(1, true);
61
62     /* Entity instances */
63     private final E serviceEntity;
64     private final E doubleCandidateEntity;
65
66     // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
67     // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
68     // FOLLOWER : baseCandidate is registered correctly
69     // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
70     // LEADER : both candidate have mastership from EOS
71     // TERMINATED : service go down
72     @GuardedBy("clusterLock")
73     private boolean hasOwnership = false;
74     @GuardedBy("clusterLock")
75     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
76     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
77
78     /* EOS Candidate Registrations */
79     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
80     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
81
82     /**
83      * Class constructor
84      *
85      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
86      * @param mainEntity
87      * @param closeEntity
88      * @param entityOwnershipService
89      */
90     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
91             final E closeEntity, final S entityOwnershipService,
92             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
93         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
94         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
95         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
96         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
97         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
98         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
99         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
100     }
101
102     @Override
103     public final ListenableFuture<List<Void>> closeClusterSingletonGroup() {
104         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
105         boolean needReleaseLock = false;
106         final ListenableFuture<List<Void>> destroyFuture;
107         try {
108             needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
109         } catch (final Exception e) {
110             LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
111                     e);
112         } finally {
113             if (serviceEntityCandidateReg != null) {
114                 serviceEntityCandidateReg.close();
115                 serviceEntityCandidateReg = null;
116             }
117             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
118             if (hasOwnership) {
119                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
120                     try {
121                         serviceCloseFutureList.add(service.closeServiceInstance());
122                     } catch (final RuntimeException e) {
123                         LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
124                                 service.getIdentifier());
125                     }
126                 }
127                 hasOwnership = false;
128             }
129             destroyFuture = Futures.allAsList(serviceCloseFutureList);
130             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
131             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
132         }
133         return destroyFuture;
134     }
135
136     @Override
137     public final void initializationClusterSingletonGroup() {
138         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
139         boolean needReleaseLock = false;
140         boolean needCloseProviderInstance = false;
141         try {
142             clusterLock.acquire();
143             needReleaseLock = true;
144             Verify.verify(serviceGroup.isEmpty());
145             Verify.verify(!hasOwnership);
146             Verify.verify(serviceEntityCandidateReg == null);
147             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
148         } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
149             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
150             needCloseProviderInstance = true;
151             throw new RuntimeException(e);
152         } finally {
153             closeResources(needReleaseLock, needCloseProviderInstance);
154         }
155     }
156
157     @Override
158     public final ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
159         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
160         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
161         boolean needReleaseLock = false;
162         boolean needCloseProviderInstance = false;
163         ClusterSingletonServiceRegistrationDelegator reg = null;
164         try {
165             clusterLock.acquire();
166             needReleaseLock = true;
167             Verify.verify(serviceEntityCandidateReg != null);
168             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
169             serviceGroup.add(reg);
170             if (hasOwnership) {
171                 service.instantiateServiceInstance();
172             }
173         } catch (final RuntimeException | InterruptedException e) {
174             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
175             needCloseProviderInstance = true;
176             throw new RuntimeException(e);
177         } finally {
178             closeResources(needReleaseLock, needCloseProviderInstance);
179         }
180         return reg;
181     }
182
183     @Override
184     public final void unregisterService(final ClusterSingletonService service) {
185         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
186         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
187         boolean needReleaseLock = false;
188         boolean needCloseProviderInstance = false;
189         try {
190             clusterLock.acquire();
191             needReleaseLock = true;
192             if (serviceGroup.size() > 1) {
193                 if (hasOwnership) {
194                     service.closeServiceInstance();
195                 }
196                 serviceGroup.remove(service);
197                 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
198             } else {
199                 needCloseProviderInstance = true;
200             }
201         } catch (final RuntimeException | InterruptedException e) {
202             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
203             needCloseProviderInstance = true;
204             throw new RuntimeException(e);
205         } finally {
206             closeResources(needReleaseLock, needCloseProviderInstance);
207         }
208     }
209
210     @Override
211     public void ownershipChanged(final C ownershipChange) {
212         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
213                 clusterSingletonGroupIdentifier);
214         try {
215             if (ownershipChange.inJeopardy()) {
216                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
217                 lostOwnership();
218                 return;
219             }
220             if (serviceEntity.equals(ownershipChange.getEntity())) {
221                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
222                     /*
223                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
224                      */
225                     tryToTakeOwnership();
226                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
227                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
228                                 .equals(ownershipChange.getState())) {
229                     /*
230                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
231                      */
232                     lostOwnership();
233                 } else {
234                     /* Not needed notifications */
235                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
236                             clusterSingletonGroupIdentifier);
237                 }
238             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
239                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
240                     /*
241                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
242                      */
243                     takeOwnership();
244                 } else {
245                     /* Not needed notifications */
246                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
247                             ownershipChange, clusterSingletonGroupIdentifier);
248                 }
249             } else {
250                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
251             }
252         } catch (final Exception e) {
253             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
254             // TODO : think about close ... is it necessary?
255         }
256     }
257
258     /*
259      * Help method to registered DoubleCandidateEntity. It is first step
260      * before the actual instance take Leadership.
261      */
262     private void tryToTakeOwnership() {
263         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
264         boolean needReleaseLock = false;
265         boolean needCloseProviderInstance = false;
266         try {
267             clusterLock.acquire();
268             needReleaseLock = true;
269             if (serviceEntityCandidateReg != null) {
270                 Verify.verify(asyncCloseEntityCandidateReg == null);
271                 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
272             } else {
273                 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
274             }
275         } catch (final Exception e) {
276             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
277                     clusterSingletonGroupIdentifier, e);
278             needCloseProviderInstance = true;
279         } finally {
280             closeResources(needReleaseLock, needCloseProviderInstance);
281         }
282     }
283
284     /*
285      * Help method calls setupService method for create single cluster-wide service instance.
286      */
287     private void takeOwnership() {
288         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
289         boolean needReleaseLock = false;
290         boolean needCloseProviderInstance = false;
291         try {
292             clusterLock.acquire();
293             needReleaseLock = true;
294             if (serviceEntityCandidateReg != null) {
295                 Verify.verify(asyncCloseEntityCandidateReg != null);
296                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
297                     service.instantiateServiceInstance();
298                 }
299                 hasOwnership = true;
300             } else {
301                 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
302             }
303         } catch (final RuntimeException | InterruptedException e) {
304             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
305                     clusterSingletonGroupIdentifier, e);
306             needCloseProviderInstance = true;
307         } finally {
308             closeResources(needReleaseLock, needCloseProviderInstance);
309         }
310     }
311
312     /*
313      * Help method calls suspendService method for stop this single cluster-wide service instance.
314      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
315      * new election for DoubleCandidateEntity.
316      */
317     private void lostOwnership() {
318         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
319         boolean needReleaseLock = false;
320         boolean needCloseProviderInstance = false;
321         try {
322             clusterLock.acquire();
323             needReleaseLock = true;
324             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
325             if (hasOwnership) {
326                 Verify.verify(asyncCloseEntityCandidateReg != null);
327                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
328                     try {
329                         serviceCloseFutureList.add(service.closeServiceInstance());
330                     } catch (final RuntimeException e) {
331                         LOG.error("Unexpected exception while closing service: {}, resuming with next..",
332                                 service.getIdentifier());
333                     }
334                 }
335                 hasOwnership = false;
336             }
337
338             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
339             if (serviceEntityCandidateReg != null) {
340                 // we don't want to remove this instance from map
341                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
342             } else {
343                 // we have to remove this ClusterSingletonServiceGroup instance from map
344                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
345             }
346             /*
347              * We wish to stop all possible EOS activities before we don't close
348              * a close candidate registration that acts as a guard. So we don't want
349              * to release Semaphore (clusterLock) before we are not fully finished.
350              * Semaphore lock release has to be realized as FutureCallback after a service
351              * instance has fully closed prior to relinquishing service ownership.
352              */
353             needReleaseLock = false;
354         } catch (final InterruptedException e) {
355             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
356                     clusterSingletonGroupIdentifier, e);
357             needCloseProviderInstance = true;
358         } finally {
359             closeResources(needReleaseLock, needCloseProviderInstance);
360         }
361     }
362
363     /*
364      * Help method for finalization every acquired functionality
365      */
366     @GuardedBy("clusterLock")
367     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
368         if (needCloseProvider) {
369             // The Game Over for this ClusterSingletonServiceGroup instance
370             if (serviceEntityCandidateReg != null) {
371                 serviceEntityCandidateReg.close();
372                 serviceEntityCandidateReg = null;
373             }
374             // Remove instance immediately because actual state is follower or initialization
375             if (asyncCloseEntityCandidateReg == null) {
376                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
377             }
378         }
379
380         if (needReleaseLock) {
381             clusterLock.release();
382         }
383     }
384
385     /*
386      * Help method creates FutureCallback for suspend Future
387      */
388     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
389             final boolean isInCloseProcess) {
390         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
391             if (throwable != null) {
392                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
393             } else {
394                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
395             }
396             if (asyncCloseEntityCandidateReg != null) {
397                 asyncCloseEntityCandidateReg.close();
398                 asyncCloseEntityCandidateReg = null;
399             }
400             if (isInCloseProcess) {
401                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
402             }
403             if (semaphore != null) {
404                 semaphore.release();
405             }
406         };
407
408         return new FutureCallback<List<Void>>() {
409
410             @Override
411             public void onSuccess(final List<Void> result) {
412                 closeEntityCandidateRegistration.accept(null);
413             }
414
415             @Override
416             public void onFailure(final Throwable t) {
417                 closeEntityCandidateRegistration.accept(t);
418             }
419         };
420     }
421
422 }