e335140bca4e6c64cf194233060713fa0b4a4026
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / ClusterSingletonServiceGroupImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Implementation of {@link ClusterSingletonServiceGroup}.
41  *
42  * @param <P> the instance identifier path type
43  * @param <E> the GenericEntity type
44  * @param <C> the GenericEntityOwnershipChange type
45  * @param <G> the GenericEntityOwnershipListener type
46  * @param <S> the GenericEntityOwnershipService type
47  */
48 @VisibleForTesting
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50                                              C extends GenericEntityOwnershipChange<P, E>,
51                                              G extends GenericEntityOwnershipListener<P, C>,
52                                              S extends GenericEntityOwnershipService<P, E, G>>
53         implements ClusterSingletonServiceGroup<P, E, C> {
54
55     private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
56
57     private final S entityOwnershipService;
58     private final String clusterSingletonGroupIdentifier;
59     private final Semaphore clusterLock = new Semaphore(1);
60
61     /* Entity instances */
62     private final E serviceEntity;
63     private final E doubleCandidateEntity;
64
65     @GuardedBy("clusterLock")
66     private boolean hasOwnership = false;
67     @GuardedBy("clusterLock")
68     private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
69     private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
70
71     /* EOS Candidate Registrations */
72     private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
73     private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
74
75     /**
76      * Class constructor.
77      *
78      * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
79      * @param mainEntity as Entity instance
80      * @param closeEntity as Entity instance
81      * @param entityOwnershipService GenericEntityOwnershipService instance
82      * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
83      */
84     ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
85             final E closeEntity, final S entityOwnershipService,
86             final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
87         LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
88         Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
89         this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
90         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
91         this.serviceEntity = Preconditions.checkNotNull(mainEntity);
92         this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
93         this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
94     }
95
96     @SuppressWarnings("checkstyle:IllegalCatch")
97     @Override
98     public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
99         LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
100         boolean needReleaseLock = false;
101         final ListenableFuture<List<Void>> destroyFuture;
102         try {
103             needReleaseLock = clusterLock.tryAcquire(10, TimeUnit.SECONDS);
104         } catch (final Exception e) {
105             LOG.warn("Unexpected Exception for service Provider {} in closing phase.",
106                     clusterSingletonGroupIdentifier, e);
107         } finally {
108             if (serviceEntityCandidateReg != null) {
109                 serviceEntityCandidateReg.close();
110                 serviceEntityCandidateReg = null;
111             }
112             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
113             if (hasOwnership) {
114                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
115                     serviceCloseFutureList.add(service.closeServiceInstance());
116                 }
117             }
118             destroyFuture = Futures.allAsList(serviceCloseFutureList);
119             final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
120             Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease));
121         }
122         return destroyFuture;
123     }
124
125     @SuppressWarnings("checkstyle:IllegalCatch")
126     @Override
127     public void initializationClusterSingletonGroup() {
128         LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
129         boolean needReleaseLock = false;
130         boolean needCloseProviderInstance = false;
131         try {
132             clusterLock.acquire();
133             needReleaseLock = true;
134             Verify.verify(serviceGroup.isEmpty());
135             Verify.verify(!hasOwnership);
136             Verify.verify(serviceEntityCandidateReg == null);
137             serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
138         } catch (final Exception e) {
139             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
140             needCloseProviderInstance = true;
141         } finally {
142             closeResources(needReleaseLock, needCloseProviderInstance);
143         }
144     }
145
146     @SuppressWarnings("checkstyle:IllegalCatch")
147     @Override
148     public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
149         LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
150         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
151         boolean needReleaseLock = false;
152         boolean needCloseProviderInstance = false;
153         ClusterSingletonServiceRegistrationDelegator reg = null;
154         try {
155             clusterLock.acquire();
156             needReleaseLock = true;
157             Verify.verify(serviceEntityCandidateReg != null);
158             reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
159             serviceGroup.add(reg);
160             if (hasOwnership) {
161                 service.instantiateServiceInstance();
162             }
163         } catch (final Exception e) {
164             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
165             needCloseProviderInstance = true;
166         } finally {
167             closeResources(needReleaseLock, needCloseProviderInstance);
168         }
169         return reg;
170     }
171
172     @SuppressWarnings("checkstyle:IllegalCatch")
173     @Override
174     public void unregisterService(final ClusterSingletonService service) {
175         LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
176         Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
177         boolean needReleaseLock = false;
178         boolean needCloseProviderInstance = false;
179         try {
180             clusterLock.acquire();
181             needReleaseLock = true;
182             serviceGroup.remove(service);
183             if (hasOwnership) {
184                 service.closeServiceInstance();
185             }
186         } catch (final Exception e) {
187             LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
188             needCloseProviderInstance = true;
189         } finally {
190             closeResources(needReleaseLock, needCloseProviderInstance);
191             if (serviceGroup.isEmpty()) {
192                 this.closeClusterSingletonGroup();
193             }
194         }
195     }
196
197     @SuppressWarnings("checkstyle:IllegalCatch")
198     @Override
199     public void ownershipChanged(final C ownershipChange) {
200         LOG.debug("Ownership change {} for ClusterSingletonServiceGrou {}", ownershipChange,
201                 clusterSingletonGroupIdentifier);
202         try {
203             if (ownershipChange.inJeopardy()) {
204                 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
205                 lostOwnership();
206                 return;
207             }
208             if (serviceEntity.equals(ownershipChange.getEntity())) {
209                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
210                     /*
211                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
212                      */
213                     tryToTakeOwnership();
214                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
215                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
216                                 .equals(ownershipChange.getState())) {
217                     /*
218                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
219                      */
220                     lostOwnership();
221                 } else {
222                     /* Not needed notifications */
223                     LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
224                             clusterSingletonGroupIdentifier);
225                 }
226             } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
227                 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
228                     /*
229                      * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
230                      */
231                     takeOwnership();
232                 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
233                         || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
234                                 .equals(ownershipChange.getState())) {
235                     /*
236                      * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
237                      */
238                     LOG.warn("Unexpected lost doubleCandidate {} leadership. Close service instance {}",
239                             doubleCandidateEntity, clusterSingletonGroupIdentifier);
240                     lostOwnership();
241                 } else {
242                     /* Not needed notifications */
243                     LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
244                             ownershipChange, clusterSingletonGroupIdentifier);
245                 }
246             } else {
247                 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
248             }
249         } catch (Exception e) {
250             LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
251             // TODO : think about close ... is it necessary?
252         }
253     }
254
255     /*
256      * Help method to registered DoubleCandidateEntity. It is first step
257      * before the actual instance take Leadership.
258      */
259     @SuppressWarnings("checkstyle:IllegalCatch")
260     private void tryToTakeOwnership() {
261         LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
262         boolean needReleaseLock = false;
263         boolean needCloseProviderInstance = false;
264         try {
265             clusterLock.acquire();
266             needReleaseLock = true;
267             Verify.verify(serviceEntityCandidateReg != null);
268             Verify.verify(asyncCloseEntityCandidateReg == null);
269             asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
270         } catch (Exception e) {
271             LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
272                     clusterSingletonGroupIdentifier, e);
273             needCloseProviderInstance = true;
274         } finally {
275             closeResources(needReleaseLock, needCloseProviderInstance);
276         }
277     }
278
279     /*
280      * Help method calls setupService method for create single cluster-wide service instance.
281      */
282     @SuppressWarnings("checkstyle:IllegalCatch")
283     private void takeOwnership() {
284         LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
285         boolean needReleaseLock = false;
286         boolean needCloseProviderInstance = false;
287         try {
288             clusterLock.acquire();
289             needReleaseLock = true;
290             Verify.verify(serviceEntityCandidateReg != null);
291             Verify.verify(asyncCloseEntityCandidateReg != null);
292             for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
293                 service.instantiateServiceInstance();
294             }
295             hasOwnership = true;
296         } catch (final Exception e) {
297             LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
298                     clusterSingletonGroupIdentifier, e);
299             needCloseProviderInstance = true;
300         } finally {
301             closeResources(needReleaseLock, needCloseProviderInstance);
302         }
303     }
304
305     /*
306      * Help method calls suspendService method for stop this single cluster-wide service instance.
307      * The last async. step has to close DoubleCandidateRegistration reference what should initialize
308      * new election for DoubleCandidateEntity.
309      */
310     @SuppressWarnings("checkstyle:IllegalCatch")
311     private void lostOwnership() {
312         LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
313         boolean needReleaseLock = false;
314         boolean needCloseProviderInstance = false;
315         try {
316             clusterLock.acquire();
317             needReleaseLock = true;
318             Verify.verify(serviceEntityCandidateReg != null);
319             final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
320             if (hasOwnership) {
321                 Verify.verify(asyncCloseEntityCandidateReg != null);
322                 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
323                     serviceCloseFutureList.add(service.closeServiceInstance());
324                 }
325             }
326
327             final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
328             Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock));
329             /*
330              * We wish to stop all possible EOS activities before we don't close
331              * a close candidate registration that acts as a guard. So we don't want
332              * to release Semaphore (clusterLock) before we are not fully finished.
333              * Semaphore lock release has to be realized as FutureCallback after a service
334              * instance has fully closed prior to relinquishing service ownership.
335              */
336             needReleaseLock = false;
337         } catch (final Exception e) {
338             LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
339                     clusterSingletonGroupIdentifier, e);
340             needCloseProviderInstance = true;
341         } finally {
342             closeResources(needReleaseLock, needCloseProviderInstance);
343         }
344     }
345
346     /*
347      * Help method for finalization every acquired functionality
348      */
349     private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
350         if (needReleaseLock) {
351             clusterLock.release();
352         }
353         if (needCloseProvider) {
354             final ListenableFuture<List<Void>> closeFutureList = this.closeClusterSingletonGroup();
355             Futures.addCallback(closeFutureList, new FutureCallback<List<Void>>() {
356
357                 @Override
358                 public void onSuccess(final List<Void> result) {
359                     removeThisGroupFromProvider(null);
360                 }
361
362                 @Override
363                 public void onFailure(final Throwable throwable) {
364                     removeThisGroupFromProvider(throwable);
365                 }
366             });
367         }
368     }
369
370     /*
371      * Help method for final ClusterSingletonGroup removing
372      */
373     protected void removeThisGroupFromProvider(@Nullable final Throwable throwable) {
374         LOG.debug("Removing ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
375         if (throwable != null) {
376             LOG.warn("Unexpected problem by closingResources ClusterSingletonServiceGroup {}",
377                     clusterSingletonGroupIdentifier);
378         }
379         allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
380     }
381
382     /*
383      * Help method creates FutureCallback for suspend Future
384      */
385     private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore) {
386         final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
387             if (throwable != null) {
388                 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
389             } else {
390                 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
391             }
392             if (asyncCloseEntityCandidateReg != null) {
393                 asyncCloseEntityCandidateReg.close();
394                 asyncCloseEntityCandidateReg = null;
395             }
396             if (semaphore != null) {
397                 semaphore.release();
398             }
399             allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
400         };
401
402         return new FutureCallback<List<Void>>() {
403
404             @Override
405             public void onSuccess(final List<Void> result) {
406                 closeEntityCandidateRegistration.accept(null);
407             }
408
409             @Override
410             public void onFailure(final Throwable throwable) {
411                 closeEntityCandidateRegistration.accept(throwable);
412             }
413         };
414     }
415
416 }