af18888ef24dddd43799754d35041df6cf276887
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Implementation of {@link ClusterSingletonServiceGroup}
41  *
42  * @param <P> the instance identifier path type
43  * @param <E> the GenericEntity type
44  * @param <C> the GenericEntityOwnershipChange type
45  * @param <G> the GenericEntityOwnershipListener type
46  * @param <S> the GenericEntityOwnershipService type
47  */
48 @VisibleForTesting
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50                                              C extends GenericEntityOwnershipChange<P, E>,
51                                              G extends GenericEntityOwnershipListener<P, C>,
52                                              S extends GenericEntityOwnershipService<P, E, G>>
53         implements ClusterSingletonServiceGroup<P, E, C> {
54
55     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
56
57     private final S entityOwnershipService;
58     private final String clusterSingletonGroupIdentifier;
59     private final Semaphore clusterLock = new Semaphore(1);
60
61     /* Entity instances */
62     private final E serviceEntity;
63     private final E doubleCandidateEntity;
64
65     @GuardedBy("clusterLock")
66     private boolean hasOwnership = false;
67     @GuardedBy("clusterLock")
68     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
69     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
70
71     /* EOS Candidate Registrations */
72     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
73     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
74
75     /**
76      * Class constructor
77      *
78      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
79      * @param mainEntity
80      * @param closeEntity
81      * @param entityOwnershipService
82      */
83     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
84             final E closeEntity, final S entityOwnershipService,
85             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
86         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
87         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
88         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
89         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
90         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
91         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
92         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
93     }
94
95     @Override
96     public final ListenableFuture<List<Void>> closeClusterSingletonGroup() {
97         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
98         boolean needReleaseLock = false;
99         final ListenableFuture<List<Void>> destroyFuture;
100         try {
101             needReleaseLock = clusterLock.tryAcquire(10, TimeUnit.SECONDS);
102         } catch (final Exception e) {
103             LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier, e);
104         } finally {
105             if (serviceEntityCandidateReg != null) {
106                 serviceEntityCandidateReg.close();
107                 serviceEntityCandidateReg = null;
108             }
109             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
110             if (hasOwnership) {
111                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
112                     serviceCloseFutureList.add(service.closeServiceInstance());
113                 }
114             }
115             destroyFuture = Futures.allAsList(serviceCloseFutureList);
116             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
117             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease));
118         }
119         return destroyFuture;
120     }
121
122     @Override
123     public final void initializationClusterSingletonGroup() {
124         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
125         boolean needReleaseLock = false;
126         boolean needCloseProviderInstance = false;
127         try {
128             clusterLock.acquire();
129             needReleaseLock = true;
130             Verify.verify(serviceGroup.isEmpty());
131             Verify.verify(!hasOwnership);
132             Verify.verify(serviceEntityCandidateReg == null);
133             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
134         } catch (final Exception e) {
135             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
136             needCloseProviderInstance = true;
137         } finally {
138             closeResources(needReleaseLock, needCloseProviderInstance);
139         }
140     }
141
142     @Override
143     public final ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
144         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
145         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
146         boolean needReleaseLock = false;
147         boolean needCloseProviderInstance = false;
148         ClusterSingletonServiceRegistrationDelegator reg = null;
149         try {
150             clusterLock.acquire();
151             needReleaseLock = true;
152             Verify.verify(serviceEntityCandidateReg != null);
153             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
154             serviceGroup.add(reg);
155             if (hasOwnership) {
156                 service.instantiateServiceInstance();
157             }
158         } catch (final Exception e) {
159             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
160             needCloseProviderInstance = true;
161         } finally {
162             closeResources(needReleaseLock, needCloseProviderInstance);
163         }
164         return reg;
165     }
166
167     @Override
168     public final void unregisterService(final ClusterSingletonService service) {
169         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
170         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
171         boolean needReleaseLock = false;
172         boolean needCloseProviderInstance = false;
173         try {
174             clusterLock.acquire();
175             needReleaseLock = true;
176             serviceGroup.remove(service);
177             if (hasOwnership) {
178                 service.closeServiceInstance();
179             }
180         } catch (final Exception e) {
181             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
182             needCloseProviderInstance = true;
183         } finally {
184             closeResources(needReleaseLock, needCloseProviderInstance);
185             if (serviceGroup.isEmpty()) {
186                 this.closeClusterSingletonGroup();
187             }
188         }
189     }
190
191     @Override
192     public final void ownershipChanged(final C ownershipChange) {
193         LOG.debug("Ownership change {} for ClusterSingletonServiceGrou {}", ownershipChange,
194                 clusterSingletonGroupIdentifier);
195         try {
196             if (ownershipChange.inJeopardy()) {
197                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
198                 lostOwnership();
199                 return;
200             }
201             if (serviceEntity.equals(ownershipChange.getEntity())) {
202                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
203                     /*
204                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
205                      */
206                     tryToTakeOwnership();
207                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
208                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
209                                 .equals(ownershipChange.getState())) {
210                     /*
211                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
212                      */
213                     lostOwnership();
214                 } else {
215                     /* Not needed notifications */
216                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
217                             clusterSingletonGroupIdentifier);
218                 }
219             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
220                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
221                     /*
222                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
223                      */
224                     takeOwnership();
225                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
226                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
227                                 .equals(ownershipChange.getState())) {
228                     /*
229                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
230                      */
231                     LOG.warn("Unexpected lost doubleCandidate {} leadership. Close service instance {}",
232                             doubleCandidateEntity, clusterSingletonGroupIdentifier);
233                     lostOwnership();
234                 } else {
235                     /* Not needed notifications */
236                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
237                             ownershipChange, clusterSingletonGroupIdentifier);
238                 }
239             } else {
240                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
241             }
242         } catch (final Exception e) {
243             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
244             // TODO : think about close ... is it necessary?
245         }
246     }
247
248     /*
249      * Help method to registered DoubleCandidateEntity. It is first step
250      * before the actual instance take Leadership.
251      */
252     private void tryToTakeOwnership() {
253         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
254         boolean needReleaseLock = false;
255         boolean needCloseProviderInstance = false;
256         try {
257             clusterLock.acquire();
258             needReleaseLock = true;
259             Verify.verify(serviceEntityCandidateReg != null);
260             Verify.verify(asyncCloseEntityCandidateReg == null);
261             asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
262         } catch (final Exception e) {
263             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
264                     clusterSingletonGroupIdentifier, e);
265             needCloseProviderInstance = true;
266         } finally {
267             closeResources(needReleaseLock, needCloseProviderInstance);
268         }
269     }
270
271     /*
272      * Help method calls setupService method for create single cluster-wide service instance.
273      */
274     private void takeOwnership() {
275         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
276         boolean needReleaseLock = false;
277         boolean needCloseProviderInstance = false;
278         try {
279             clusterLock.acquire();
280             needReleaseLock = true;
281             Verify.verify(serviceEntityCandidateReg != null);
282             Verify.verify(asyncCloseEntityCandidateReg != null);
283             for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
284                 service.instantiateServiceInstance();
285             }
286             hasOwnership = true;
287         } catch (final Exception e) {
288             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
289                     clusterSingletonGroupIdentifier, e);
290             needCloseProviderInstance = true;
291         } finally {
292             closeResources(needReleaseLock, needCloseProviderInstance);
293         }
294     }
295
296     /*
297      * Help method calls suspendService method for stop this single cluster-wide service instance.
298      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
299      * new election for DoubleCandidateEntity.
300      */
301     private void lostOwnership() {
302         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
303         boolean needReleaseLock = false;
304         boolean needCloseProviderInstance = false;
305         try {
306             clusterLock.acquire();
307             needReleaseLock = true;
308             Verify.verify(serviceEntityCandidateReg != null);
309             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
310             if (hasOwnership) {
311                 Verify.verify(asyncCloseEntityCandidateReg != null);
312                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
313                     serviceCloseFutureList.add(service.closeServiceInstance());
314                 }
315             }
316
317             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
318             Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock));
319             /*
320              * We wish to stop all possible EOS activities before we don't close
321              * a close candidate registration that acts as a guard. So we don't want
322              * to release Semaphore (clusterLock) before we are not fully finished.
323              * Semaphore lock release has to be realized as FutureCallback after a service
324              * instance has fully closed prior to relinquishing service ownership.
325              */
326             needReleaseLock = false;
327         } catch (final Exception e) {
328             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
329                     clusterSingletonGroupIdentifier, e);
330             needCloseProviderInstance = true;
331         } finally {
332             closeResources(needReleaseLock, needCloseProviderInstance);
333         }
334     }
335
336     /*
337      * Help method for finalization every acquired functionality
338      */
339     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
340         if (needReleaseLock) {
341             clusterLock.release();
342         }
343         if (needCloseProvider) {
344             final ListenableFuture<List<Void>> closeFutureList = this.closeClusterSingletonGroup();
345             Futures.addCallback(closeFutureList, new FutureCallback<List<Void>>() {
346
347                 @Override
348                 public void onSuccess(final List<Void> result) {
349                     removeThisGroupFromProvider(null);
350                 }
351
352                 @Override
353                 public void onFailure(final Throwable t) {
354                     removeThisGroupFromProvider(t);
355                 }
356             });
357         }
358     }
359
360     /*
361      * Help method for final ClusterSingletonGroup removing
362      */
363     protected final void removeThisGroupFromProvider(@Nullable final Throwable t) {
364         LOG.debug("Removing ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
365         if (t != null) {
366             LOG.warn("Unexpected problem by closingResources ClusterSingletonServiceGroup {}",
367                     clusterSingletonGroupIdentifier);
368         }
369         allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
370     }
371
372     /*
373      * Help method creates FutureCallback for suspend Future
374      */
375     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore) {
376         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable t) -> {
377             if (t != null) {
378                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, t);
379             } else {
380                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
381             }
382             if (asyncCloseEntityCandidateReg != null) {
383                 asyncCloseEntityCandidateReg.close();
384                 asyncCloseEntityCandidateReg = null;
385             }
386             if (semaphore != null) {
387                 semaphore.release();
388             }
389             allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
390         };
391
392         return new FutureCallback<List<Void>>() {
393
394             @Override
395             public void onSuccess(final List<Void> result) {
396                 closeEntityCandidateRegistration.accept(null);
397             }
398
399             @Override
400             public void onFailure(final Throwable t) {
401                 closeEntityCandidateRegistration.accept(t);
402             }
403         };
404     }
405
406 }