Bug 6710 - Close ClusterSingletonServiceRegistration fix
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Implementation of {@link ClusterSingletonServiceGroup}
41  *
42  * @param <P> the instance identifier path type
43  * @param <E> the GenericEntity type
44  * @param <C> the GenericEntityOwnershipChange type
45  * @param <G> the GenericEntityOwnershipListener type
46  * @param <S> the GenericEntityOwnershipService type
47  */
48 @VisibleForTesting
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50                                              C extends GenericEntityOwnershipChange<P, E>,
51                                              G extends GenericEntityOwnershipListener<P, C>,
52                                              S extends GenericEntityOwnershipService<P, E, G>>
53         implements ClusterSingletonServiceGroup<P, E, C> {
54
55     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
56
57     private final S entityOwnershipService;
58     private final String clusterSingletonGroupIdentifier;
59     private final Semaphore clusterLock = new Semaphore(1, true);
60
61     /* Entity instances */
62     private final E serviceEntity;
63     private final E doubleCandidateEntity;
64
65     // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
66     // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
67     // FOLLOWER : baseCandidate is registered correctly
68     // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
69     // LEADER : both candidate have mastership from EOS
70     // TERMINATED : service go down
71     @GuardedBy("clusterLock")
72     private boolean hasOwnership = false;
73     @GuardedBy("clusterLock")
74     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
75     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
76
77     /* EOS Candidate Registrations */
78     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
79     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
80
81     /**
82      * Class constructor
83      *
84      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
85      * @param mainEntity
86      * @param closeEntity
87      * @param entityOwnershipService
88      */
89     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
90             final E closeEntity, final S entityOwnershipService,
91             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
92         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
93         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
94         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
95         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
96         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
97         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
98         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
99     }
100
101     @Override
102     public final ListenableFuture<List<Void>> closeClusterSingletonGroup() {
103         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
104         boolean needReleaseLock = false;
105         final ListenableFuture<List<Void>> destroyFuture;
106         try {
107             needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
108         } catch (final Exception e) {
109             LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
110                     e);
111         } finally {
112             if (serviceEntityCandidateReg != null) {
113                 serviceEntityCandidateReg.close();
114                 serviceEntityCandidateReg = null;
115             }
116             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
117             if (hasOwnership) {
118                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
119                     serviceCloseFutureList.add(service.closeServiceInstance());
120                 }
121                 hasOwnership = false;
122             }
123             destroyFuture = Futures.allAsList(serviceCloseFutureList);
124             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
125             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
126         }
127         return destroyFuture;
128     }
129
130     @Override
131     public final void initializationClusterSingletonGroup() {
132         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
133         boolean needReleaseLock = false;
134         boolean needCloseProviderInstance = false;
135         try {
136             clusterLock.acquire();
137             needReleaseLock = true;
138             Verify.verify(serviceGroup.isEmpty());
139             Verify.verify(!hasOwnership);
140             Verify.verify(serviceEntityCandidateReg == null);
141             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
142         } catch (final Exception e) {
143             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
144             needCloseProviderInstance = true;
145             throw new RuntimeException(e);
146         } finally {
147             closeResources(needReleaseLock, needCloseProviderInstance);
148         }
149     }
150
151     @Override
152     public final ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
153         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
154         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
155         boolean needReleaseLock = false;
156         boolean needCloseProviderInstance = false;
157         ClusterSingletonServiceRegistrationDelegator reg = null;
158         try {
159             clusterLock.acquire();
160             needReleaseLock = true;
161             Verify.verify(serviceEntityCandidateReg != null);
162             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
163             serviceGroup.add(reg);
164             if (hasOwnership) {
165                 service.instantiateServiceInstance();
166             }
167         } catch (final Exception e) {
168             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
169             needCloseProviderInstance = true;
170             throw new RuntimeException(e);
171         } finally {
172             closeResources(needReleaseLock, needCloseProviderInstance);
173         }
174         return reg;
175     }
176
177     @Override
178     public final void unregisterService(final ClusterSingletonService service) {
179         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
180         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
181         boolean needReleaseLock = false;
182         boolean needCloseProviderInstance = false;
183         try {
184             clusterLock.acquire();
185             needReleaseLock = true;
186             if (serviceGroup.size() > 1) {
187                 if (hasOwnership) {
188                     service.closeServiceInstance();
189                 }
190                 serviceGroup.remove(service);
191                 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
192             } else {
193                 needCloseProviderInstance = true;
194             }
195         } catch (final Exception e) {
196             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
197             needCloseProviderInstance = true;
198             throw new RuntimeException(e);
199         } finally {
200             closeResources(needReleaseLock, needCloseProviderInstance);
201         }
202     }
203
204     @Override
205     public void ownershipChanged(final C ownershipChange) {
206         LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
207                 clusterSingletonGroupIdentifier);
208         try {
209             if (ownershipChange.inJeopardy()) {
210                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
211                 lostOwnership();
212                 return;
213             }
214             if (serviceEntity.equals(ownershipChange.getEntity())) {
215                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
216                     /*
217                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
218                      */
219                     tryToTakeOwnership();
220                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
221                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
222                                 .equals(ownershipChange.getState())) {
223                     /*
224                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
225                      */
226                     lostOwnership();
227                 } else {
228                     /* Not needed notifications */
229                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
230                             clusterSingletonGroupIdentifier);
231                 }
232             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
233                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
234                     /*
235                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
236                      */
237                     takeOwnership();
238                 } else {
239                     /* Not needed notifications */
240                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
241                             ownershipChange, clusterSingletonGroupIdentifier);
242                 }
243             } else {
244                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
245             }
246         } catch (final Exception e) {
247             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
248             // TODO : think about close ... is it necessary?
249         }
250     }
251
252     /*
253      * Help method to registered DoubleCandidateEntity. It is first step
254      * before the actual instance take Leadership.
255      */
256     private void tryToTakeOwnership() {
257         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
258         boolean needReleaseLock = false;
259         boolean needCloseProviderInstance = false;
260         try {
261             clusterLock.acquire();
262             needReleaseLock = true;
263             if (serviceEntityCandidateReg != null) {
264                 Verify.verify(asyncCloseEntityCandidateReg == null);
265                 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
266             } else {
267                 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
268             }
269         } catch (final Exception e) {
270             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
271                     clusterSingletonGroupIdentifier, e);
272             needCloseProviderInstance = true;
273         } finally {
274             closeResources(needReleaseLock, needCloseProviderInstance);
275         }
276     }
277
278     /*
279      * Help method calls setupService method for create single cluster-wide service instance.
280      */
281     private void takeOwnership() {
282         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
283         boolean needReleaseLock = false;
284         boolean needCloseProviderInstance = false;
285         try {
286             clusterLock.acquire();
287             needReleaseLock = true;
288             if (serviceEntityCandidateReg != null) {
289                 Verify.verify(asyncCloseEntityCandidateReg != null);
290                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
291                     service.instantiateServiceInstance();
292                 }
293                 hasOwnership = true;
294             } else {
295                 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
296             }
297         } catch (final Exception e) {
298             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
299                     clusterSingletonGroupIdentifier, e);
300             needCloseProviderInstance = true;
301         } finally {
302             closeResources(needReleaseLock, needCloseProviderInstance);
303         }
304     }
305
306     /*
307      * Help method calls suspendService method for stop this single cluster-wide service instance.
308      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
309      * new election for DoubleCandidateEntity.
310      */
311     private void lostOwnership() {
312         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
313         boolean needReleaseLock = false;
314         boolean needCloseProviderInstance = false;
315         try {
316             clusterLock.acquire();
317             needReleaseLock = true;
318             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
319             if (hasOwnership) {
320                 Verify.verify(asyncCloseEntityCandidateReg != null);
321                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
322                     serviceCloseFutureList.add(service.closeServiceInstance());
323                 }
324                 hasOwnership = false;
325             }
326
327             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
328             if (serviceEntityCandidateReg != null) {
329                 // we don't want to remove this instance from map
330                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
331             } else {
332                 // we have to remove this ClusterSingletonServiceGroup instance from map
333                 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
334             }
335             /*
336              * We wish to stop all possible EOS activities before we don't close
337              * a close candidate registration that acts as a guard. So we don't want
338              * to release Semaphore (clusterLock) before we are not fully finished.
339              * Semaphore lock release has to be realized as FutureCallback after a service
340              * instance has fully closed prior to relinquishing service ownership.
341              */
342             needReleaseLock = false;
343         } catch (final Exception e) {
344             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
345                     clusterSingletonGroupIdentifier, e);
346             needCloseProviderInstance = true;
347         } finally {
348             closeResources(needReleaseLock, needCloseProviderInstance);
349         }
350     }
351
352     /*
353      * Help method for finalization every acquired functionality
354      */
355     @GuardedBy("clusterLock")
356     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
357         if (needCloseProvider) {
358             // The Game Over for this ClusterSingletonServiceGroup instance
359             if (serviceEntityCandidateReg != null) {
360                 serviceEntityCandidateReg.close();
361                 serviceEntityCandidateReg = null;
362             }
363             // Remove instance immediately because actual state is follower or initialization
364             if (asyncCloseEntityCandidateReg == null) {
365                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
366             }
367         }
368
369         if (needReleaseLock) {
370             clusterLock.release();
371         }
372     }
373
374     /*
375      * Help method creates FutureCallback for suspend Future
376      */
377     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
378             final boolean isInCloseProcess) {
379         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
380             if (throwable != null) {
381                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
382             } else {
383                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
384             }
385             if (asyncCloseEntityCandidateReg != null) {
386                 asyncCloseEntityCandidateReg.close();
387                 asyncCloseEntityCandidateReg = null;
388             }
389             if (isInCloseProcess) {
390                 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
391             }
392             if (semaphore != null) {
393                 semaphore.release();
394             }
395         };
396
397         return new FutureCallback<List<Void>>() {
398
399             @Override
400             public void onSuccess(final List<Void> result) {
401                 closeEntityCandidateRegistration.accept(null);
402             }
403
404             @Override
405             public void onFailure(final Throwable t) {
406                 closeEntityCandidateRegistration.accept(t);
407             }
408         };
409     }
410
411 }