Bug 6710 - Close ClusterSingletonServiceRegistration fix
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Implementation of {@link ClusterSingletonServiceGroup}.
41  *
42  * @param <P> the instance identifier path type
43  * @param <E> the GenericEntity type
44  * @param <C> the GenericEntityOwnershipChange type
45  * @param <G> the GenericEntityOwnershipListener type
46  * @param <S> the GenericEntityOwnershipService type
47  */
48 @VisibleForTesting
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50                                              C extends GenericEntityOwnershipChange<P, E>,
51                                              G extends GenericEntityOwnershipListener<P, C>,
52                                              S extends GenericEntityOwnershipService<P, E, G>>
53         implements ClusterSingletonServiceGroup<P, E, C> {
54
55     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
56
57     private final S entityOwnershipService;
58     private final String clusterSingletonGroupIdentifier;
59     private final Semaphore clusterLock = new Semaphore(1, true);
60
61     /* Entity instances */
62     private final E serviceEntity;
63     private final E doubleCandidateEntity;
64
65     // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
66     // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
67     // FOLLOWER : baseCandidate is registered correctly
68     // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
69     // LEADER : both candidate have mastership from EOS
70     // TERMINATED : service go down
71     @GuardedBy("clusterLock")
72     private boolean hasOwnership = false;
73     @GuardedBy("clusterLock")
74     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
75     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
76
77     /* EOS Candidate Registrations */
78     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
79     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
80
81     /**
82      * Class constructor.
83      *
84      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
85      * @param mainEntity as Entity instance
86      * @param closeEntity as Entity instance
87      * @param entityOwnershipService GenericEntityOwnershipService instance
88      * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
89      */
90     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
91             final E closeEntity, final S entityOwnershipService,
92             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
93         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
94         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
95         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
96         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
97         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
98         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
99         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
100     }
101
102     @SuppressWarnings("checkstyle:IllegalCatch")
103     @Override
104     public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
105         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
106         boolean needReleaseLock = false;
107         final ListenableFuture<List<Void>> destroyFuture;
108         try {
109             needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
110         } catch (final Exception e) {
111             LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
112                     e);
113         } finally {
114             if (serviceEntityCandidateReg != null) {
115                 serviceEntityCandidateReg.close();
116                 serviceEntityCandidateReg = null;
117             }
118             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
119             if (hasOwnership) {
120                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
121                     serviceCloseFutureList.add(service.closeServiceInstance());
122                 }
123                 hasOwnership = false;
124             }
125             destroyFuture = Futures.allAsList(serviceCloseFutureList);
126             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
127             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
128         }
129         return destroyFuture;
130     }
131
132     @SuppressWarnings("checkstyle:IllegalCatch")
133     @Override
134     public void initializationClusterSingletonGroup() {
135         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
136         boolean needReleaseLock = false;
137         boolean needCloseProviderInstance = false;
138         try {
139             clusterLock.acquire();
140             needReleaseLock = true;
141             Verify.verify(serviceGroup.isEmpty());
142             Verify.verify(!hasOwnership);
143             Verify.verify(serviceEntityCandidateReg == null);
144             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
145         } catch (final Exception e) {
146             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
147             needCloseProviderInstance = true;
148             throw new RuntimeException(e);
149         } finally {
150             closeResources(needReleaseLock, needCloseProviderInstance);
151         }
152     }
153
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     @Override
156     public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
157         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
158         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
159         boolean needReleaseLock = false;
160         boolean needCloseProviderInstance = false;
161         ClusterSingletonServiceRegistrationDelegator reg = null;
162         try {
163             clusterLock.acquire();
164             needReleaseLock = true;
165             Verify.verify(serviceEntityCandidateReg != null);
166             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
167             serviceGroup.add(reg);
168             if (hasOwnership) {
169                 service.instantiateServiceInstance();
170             }
171         } catch (final Exception e) {
172             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
173             needCloseProviderInstance = true;
174             throw new RuntimeException(e);
175         } finally {
176             closeResources(needReleaseLock, needCloseProviderInstance);
177         }
178         return reg;
179     }
180
181     @SuppressWarnings("checkstyle:IllegalCatch")
182     @Override
183     public void unregisterService(final ClusterSingletonService service) {
184         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
185         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
186         boolean needReleaseLock = false;
187         boolean needCloseProviderInstance = false;
188         try {
189             clusterLock.acquire();
190             needReleaseLock = true;
191             if (serviceGroup.size() > 1) {
192                 if (hasOwnership) {
193                     service.closeServiceInstance();
194                 }
195                 serviceGroup.remove(service);
196                 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
197             } else {
198                 needCloseProviderInstance = true;
199             }
200         } catch (final Exception e) {
201             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
202             needCloseProviderInstance = true;
203             throw new RuntimeException(e);
204         } finally {
205             closeResources(needReleaseLock, needCloseProviderInstance);
206         }
207     }
208
209     @SuppressWarnings("checkstyle:IllegalCatch")
210     @Override
211     public void ownershipChanged(final C ownershipChange) {
212         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
213                 clusterSingletonGroupIdentifier);
214         try {
215             if (ownershipChange.inJeopardy()) {
216                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
217                 lostOwnership();
218                 return;
219             }
220             if (serviceEntity.equals(ownershipChange.getEntity())) {
221                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
222                     /*
223                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
224                      */
225                     tryToTakeOwnership();
226                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
227                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
228                                 .equals(ownershipChange.getState())) {
229                     /*
230                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
231                      */
232                     lostOwnership();
233                 } else {
234                     /* Not needed notifications */
235                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
236                             clusterSingletonGroupIdentifier);
237                 }
238             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
239                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
240                     /*
241                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
242                      */
243                     takeOwnership();
244                 } else {
245                     /* Not needed notifications */
246                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
247                             ownershipChange, clusterSingletonGroupIdentifier);
248                 }
249             } else {
250                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
251             }
252         } catch (final Exception e) {
253             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
254             // TODO : think about close ... is it necessary?
255         }
256     }
257
258     /*
259      * Help method to registered DoubleCandidateEntity. It is first step
260      * before the actual instance take Leadership.
261      */
262     @SuppressWarnings("checkstyle:IllegalCatch")
263     private void tryToTakeOwnership() {
264         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
265         boolean needReleaseLock = false;
266         boolean needCloseProviderInstance = false;
267         try {
268             clusterLock.acquire();
269             needReleaseLock = true;
270             if (serviceEntityCandidateReg != null) {
271                 Verify.verify(asyncCloseEntityCandidateReg == null);
272                 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
273             } else {
274                 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
275             }
276         } catch (final Exception e) {
277             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
278                     clusterSingletonGroupIdentifier, e);
279             needCloseProviderInstance = true;
280         } finally {
281             closeResources(needReleaseLock, needCloseProviderInstance);
282         }
283     }
284
285     /*
286      * Help method calls setupService method for create single cluster-wide service instance.
287      */
288     @SuppressWarnings("checkstyle:IllegalCatch")
289     private void takeOwnership() {
290         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
291         boolean needReleaseLock = false;
292         boolean needCloseProviderInstance = false;
293         try {
294             clusterLock.acquire();
295             needReleaseLock = true;
296             if (serviceEntityCandidateReg != null) {
297                 Verify.verify(asyncCloseEntityCandidateReg != null);
298                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
299                     service.instantiateServiceInstance();
300                 }
301                 hasOwnership = true;
302             } else {
303                 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
304             }
305         } catch (final Exception e) {
306             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
307                     clusterSingletonGroupIdentifier, e);
308             needCloseProviderInstance = true;
309         } finally {
310             closeResources(needReleaseLock, needCloseProviderInstance);
311         }
312     }
313
314     /*
315      * Help method calls suspendService method for stop this single cluster-wide service instance.
316      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
317      * new election for DoubleCandidateEntity.
318      */
319     @SuppressWarnings("checkstyle:IllegalCatch")
320     private void lostOwnership() {
321         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
322         boolean needReleaseLock = false;
323         boolean needCloseProviderInstance = false;
324         try {
325             clusterLock.acquire();
326             needReleaseLock = true;
327             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
328             if (hasOwnership) {
329                 Verify.verify(asyncCloseEntityCandidateReg != null);
330                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
331                     serviceCloseFutureList.add(service.closeServiceInstance());
332                 }
333                 hasOwnership = false;
334             }
335
336             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
337             if (serviceEntityCandidateReg != null) {
338                 // we don't want to remove this instance from map
339                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
340             } else {
341                 // we have to remove this ClusterSingletonServiceGroup instance from map
342                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
343             }
344             /*
345              * We wish to stop all possible EOS activities before we don't close
346              * a close candidate registration that acts as a guard. So we don't want
347              * to release Semaphore (clusterLock) before we are not fully finished.
348              * Semaphore lock release has to be realized as FutureCallback after a service
349              * instance has fully closed prior to relinquishing service ownership.
350              */
351             needReleaseLock = false;
352         } catch (final Exception e) {
353             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
354                     clusterSingletonGroupIdentifier, e);
355             needCloseProviderInstance = true;
356         } finally {
357             closeResources(needReleaseLock, needCloseProviderInstance);
358         }
359     }
360
361     /*
362      * Help method for finalization every acquired functionality
363      */
364     @GuardedBy("clusterLock")
365     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
366         if (needCloseProvider) {
367             // The Game Over for this ClusterSingletonServiceGroup instance
368             if (serviceEntityCandidateReg != null) {
369                 serviceEntityCandidateReg.close();
370                 serviceEntityCandidateReg = null;
371             }
372             // Remove instance immediately because actual state is follower or initialization
373             if (asyncCloseEntityCandidateReg == null) {
374                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
375             }
376         }
377
378         if (needReleaseLock) {
379             clusterLock.release();
380         }
381     }
382
383     /*
384      * Help method creates FutureCallback for suspend Future
385      */
386     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
387             final boolean isInCloseProcess) {
388         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
389             if (throwable != null) {
390                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
391             } else {
392                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
393             }
394             if (asyncCloseEntityCandidateReg != null) {
395                 asyncCloseEntityCandidateReg.close();
396                 asyncCloseEntityCandidateReg = null;
397             }
398             if (isInCloseProcess) {
399                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
400             }
401             if (semaphore != null) {
402                 semaphore.release();
403             }
404         };
405
406         return new FutureCallback<List<Void>>() {
407
408             @Override
409             public void onSuccess(final List<Void> result) {
410                 closeEntityCandidateRegistration.accept(null);
411             }
412
413             @Override
414             public void onFailure(final Throwable throwable) {
415                 closeEntityCandidateRegistration.accept(throwable);
416             }
417         };
418     }
419
420 }