BUG-7222: Improve ClusterSingletonService error handling.
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.yangtools.concepts.Path;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Implementation of {@link ClusterSingletonServiceGroup}.
42  *
43  * @param <P> the instance identifier path type
44  * @param <E> the GenericEntity type
45  * @param <C> the GenericEntityOwnershipChange type
46  * @param <G> the GenericEntityOwnershipListener type
47  * @param <S> the GenericEntityOwnershipService type
48  */
49 @VisibleForTesting
50 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
51                                              C extends GenericEntityOwnershipChange<P, E>,
52                                              G extends GenericEntityOwnershipListener<P, C>,
53                                              S extends GenericEntityOwnershipService<P, E, G>>
54         implements ClusterSingletonServiceGroup<P, E, C> {
55
56     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
57
58     private final S entityOwnershipService;
59     private final String clusterSingletonGroupIdentifier;
60     private final Semaphore clusterLock = new Semaphore(1, true);
61
62     /* Entity instances */
63     private final E serviceEntity;
64     private final E doubleCandidateEntity;
65
66     // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
67     // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
68     // FOLLOWER : baseCandidate is registered correctly
69     // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
70     // LEADER : both candidate have mastership from EOS
71     // TERMINATED : service go down
72     @GuardedBy("clusterLock")
73     private boolean hasOwnership = false;
74     @GuardedBy("clusterLock")
75     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
76     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
77
78     /* EOS Candidate Registrations */
79     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
80     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
81
82     /**
83      * Class constructor.
84      *
85      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
86      * @param mainEntity as Entity instance
87      * @param closeEntity as Entity instance
88      * @param entityOwnershipService GenericEntityOwnershipService instance
89      * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
90      */
91     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
92             final E closeEntity, final S entityOwnershipService,
93             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
94         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
95         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
96         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
97         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
98         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
99         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
100         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
101     }
102
103     @SuppressWarnings("checkstyle:IllegalCatch")
104     @Override
105     public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
106         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
107         boolean needReleaseLock = false;
108         final ListenableFuture<List<Void>> destroyFuture;
109         try {
110             needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
111         } catch (final Exception e) {
112             LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
113                     e);
114         } finally {
115             if (serviceEntityCandidateReg != null) {
116                 serviceEntityCandidateReg.close();
117                 serviceEntityCandidateReg = null;
118             }
119             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
120             if (hasOwnership) {
121                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
122                     try {
123                         serviceCloseFutureList.add(service.closeServiceInstance());
124                     } catch (final RuntimeException e) {
125                         LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
126                                 service.getIdentifier());
127                     }
128                 }
129                 hasOwnership = false;
130             }
131             destroyFuture = Futures.allAsList(serviceCloseFutureList);
132             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
133             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
134         }
135         return destroyFuture;
136     }
137
138     @SuppressWarnings("checkstyle:IllegalCatch")
139     @Override
140     public void initializationClusterSingletonGroup() {
141         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
142         boolean needReleaseLock = false;
143         boolean needCloseProviderInstance = false;
144         try {
145             clusterLock.acquire();
146             needReleaseLock = true;
147             Verify.verify(serviceGroup.isEmpty());
148             Verify.verify(!hasOwnership);
149             Verify.verify(serviceEntityCandidateReg == null);
150             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
151         } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
152             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
153             needCloseProviderInstance = true;
154             throw new RuntimeException(e);
155         } finally {
156             closeResources(needReleaseLock, needCloseProviderInstance);
157         }
158     }
159
160     @SuppressWarnings("checkstyle:IllegalCatch")
161     @Override
162     public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
163         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
164         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
165         boolean needReleaseLock = false;
166         boolean needCloseProviderInstance = false;
167         ClusterSingletonServiceRegistrationDelegator reg = null;
168         try {
169             clusterLock.acquire();
170             needReleaseLock = true;
171             Verify.verify(serviceEntityCandidateReg != null);
172             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
173             serviceGroup.add(reg);
174             if (hasOwnership) {
175                 service.instantiateServiceInstance();
176             }
177         } catch (final RuntimeException | InterruptedException e) {
178             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
179             needCloseProviderInstance = true;
180             throw new RuntimeException(e);
181         } finally {
182             closeResources(needReleaseLock, needCloseProviderInstance);
183         }
184         return reg;
185     }
186
187     @SuppressWarnings("checkstyle:IllegalCatch")
188     @Override
189     public void unregisterService(final ClusterSingletonService service) {
190         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
191         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
192         boolean needReleaseLock = false;
193         boolean needCloseProviderInstance = false;
194         try {
195             clusterLock.acquire();
196             needReleaseLock = true;
197             if (serviceGroup.size() > 1) {
198                 if (hasOwnership) {
199                     service.closeServiceInstance();
200                 }
201                 serviceGroup.remove(service);
202                 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
203             } else {
204                 needCloseProviderInstance = true;
205             }
206         } catch (final RuntimeException | InterruptedException e) {
207             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
208             needCloseProviderInstance = true;
209             throw new RuntimeException(e);
210         } finally {
211             closeResources(needReleaseLock, needCloseProviderInstance);
212         }
213     }
214
215     @SuppressWarnings("checkstyle:IllegalCatch")
216     @Override
217     public void ownershipChanged(final C ownershipChange) {
218         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
219                 clusterSingletonGroupIdentifier);
220         try {
221             if (ownershipChange.inJeopardy()) {
222                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
223                 lostOwnership();
224                 return;
225             }
226             if (serviceEntity.equals(ownershipChange.getEntity())) {
227                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
228                     /*
229                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
230                      */
231                     tryToTakeOwnership();
232                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
233                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
234                                 .equals(ownershipChange.getState())) {
235                     /*
236                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
237                      */
238                     lostOwnership();
239                 } else {
240                     /* Not needed notifications */
241                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
242                             clusterSingletonGroupIdentifier);
243                 }
244             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
245                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
246                     /*
247                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
248                      */
249                     takeOwnership();
250                 } else {
251                     /* Not needed notifications */
252                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
253                             ownershipChange, clusterSingletonGroupIdentifier);
254                 }
255             } else {
256                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
257             }
258         } catch (final Exception e) {
259             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
260             // TODO : think about close ... is it necessary?
261         }
262     }
263
264     /*
265      * Help method to registered DoubleCandidateEntity. It is first step
266      * before the actual instance take Leadership.
267      */
268     @SuppressWarnings("checkstyle:IllegalCatch")
269     private void tryToTakeOwnership() {
270         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
271         boolean needReleaseLock = false;
272         boolean needCloseProviderInstance = false;
273         try {
274             clusterLock.acquire();
275             needReleaseLock = true;
276             if (serviceEntityCandidateReg != null) {
277                 Verify.verify(asyncCloseEntityCandidateReg == null);
278                 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
279             } else {
280                 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
281             }
282         } catch (final Exception e) {
283             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
284                     clusterSingletonGroupIdentifier, e);
285             needCloseProviderInstance = true;
286         } finally {
287             closeResources(needReleaseLock, needCloseProviderInstance);
288         }
289     }
290
291     /*
292      * Help method calls setupService method for create single cluster-wide service instance.
293      */
294     @SuppressWarnings("checkstyle:IllegalCatch")
295     private void takeOwnership() {
296         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
297         boolean needReleaseLock = false;
298         boolean needCloseProviderInstance = false;
299         try {
300             clusterLock.acquire();
301             needReleaseLock = true;
302             if (serviceEntityCandidateReg != null) {
303                 Verify.verify(asyncCloseEntityCandidateReg != null);
304                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
305                     service.instantiateServiceInstance();
306                 }
307                 hasOwnership = true;
308             } else {
309                 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
310             }
311         } catch (final RuntimeException | InterruptedException e) {
312             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
313                     clusterSingletonGroupIdentifier, e);
314             needCloseProviderInstance = true;
315         } finally {
316             closeResources(needReleaseLock, needCloseProviderInstance);
317         }
318     }
319
320     /*
321      * Help method calls suspendService method for stop this single cluster-wide service instance.
322      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
323      * new election for DoubleCandidateEntity.
324      */
325     @SuppressWarnings("checkstyle:IllegalCatch")
326     private void lostOwnership() {
327         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
328         boolean needReleaseLock = false;
329         boolean needCloseProviderInstance = false;
330         try {
331             clusterLock.acquire();
332             needReleaseLock = true;
333             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
334             if (hasOwnership) {
335                 Verify.verify(asyncCloseEntityCandidateReg != null);
336                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
337                     try {
338                         serviceCloseFutureList.add(service.closeServiceInstance());
339                     } catch (final RuntimeException e) {
340                         LOG.error("Unexpected exception while closing service: {}, resuming with next..",
341                                 service.getIdentifier());
342                     }
343                 }
344                 hasOwnership = false;
345             }
346
347             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
348             if (serviceEntityCandidateReg != null) {
349                 // we don't want to remove this instance from map
350                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
351             } else {
352                 // we have to remove this ClusterSingletonServiceGroup instance from map
353                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
354             }
355             /*
356              * We wish to stop all possible EOS activities before we don't close
357              * a close candidate registration that acts as a guard. So we don't want
358              * to release Semaphore (clusterLock) before we are not fully finished.
359              * Semaphore lock release has to be realized as FutureCallback after a service
360              * instance has fully closed prior to relinquishing service ownership.
361              */
362             needReleaseLock = false;
363         } catch (final InterruptedException e) {
364             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
365                     clusterSingletonGroupIdentifier, e);
366             needCloseProviderInstance = true;
367         } finally {
368             closeResources(needReleaseLock, needCloseProviderInstance);
369         }
370     }
371
372     /*
373      * Help method for finalization every acquired functionality
374      */
375     @GuardedBy("clusterLock")
376     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
377         if (needCloseProvider) {
378             // The Game Over for this ClusterSingletonServiceGroup instance
379             if (serviceEntityCandidateReg != null) {
380                 serviceEntityCandidateReg.close();
381                 serviceEntityCandidateReg = null;
382             }
383             // Remove instance immediately because actual state is follower or initialization
384             if (asyncCloseEntityCandidateReg == null) {
385                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
386             }
387         }
388
389         if (needReleaseLock) {
390             clusterLock.release();
391         }
392     }
393
394     /*
395      * Help method creates FutureCallback for suspend Future
396      */
397     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
398             final boolean isInCloseProcess) {
399         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
400             if (throwable != null) {
401                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
402             } else {
403                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
404             }
405             if (asyncCloseEntityCandidateReg != null) {
406                 asyncCloseEntityCandidateReg.close();
407                 asyncCloseEntityCandidateReg = null;
408             }
409             if (isInCloseProcess) {
410                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
411             }
412             if (semaphore != null) {
413                 semaphore.release();
414             }
415         };
416
417         return new FutureCallback<List<Void>>() {
418
419             @Override
420             public void onSuccess(final List<Void> result) {
421                 closeEntityCandidateRegistration.accept(null);
422             }
423
424             @Override
425             public void onFailure(final Throwable throwable) {
426                 closeEntityCandidateRegistration.accept(throwable);
427             }
428         };
429     }
430
431 }