2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Implementation of {@link ClusterSingletonServiceGroup}.
42 * @param <P> the instance identifier path type
43 * @param <E> the GenericEntity type
44 * @param <C> the GenericEntityOwnershipChange type
45 * @param <G> the GenericEntityOwnershipListener type
46 * @param <S> the GenericEntityOwnershipService type
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50 C extends GenericEntityOwnershipChange<P, E>,
51 G extends GenericEntityOwnershipListener<P, C>,
52 S extends GenericEntityOwnershipService<P, E, G>>
53 implements ClusterSingletonServiceGroup<P, E, C> {
55 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
57 private final S entityOwnershipService;
58 private final String clusterSingletonGroupIdentifier;
59 private final Semaphore clusterLock = new Semaphore(1);
61 /* Entity instances */
62 private final E serviceEntity;
63 private final E doubleCandidateEntity;
65 @GuardedBy("clusterLock")
66 private boolean hasOwnership = false;
67 @GuardedBy("clusterLock")
68 private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
69 private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
71 /* EOS Candidate Registrations */
72 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
73 private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
78 * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
79 * @param mainEntity as Entity instance
80 * @param closeEntity as Entity instance
81 * @param entityOwnershipService GenericEntityOwnershipService instance
82 * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
84 ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
85 final E closeEntity, final S entityOwnershipService,
86 final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
87 LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
88 Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
89 this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
90 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
91 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
92 this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
93 this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
96 @SuppressWarnings("checkstyle:IllegalCatch")
98 public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
99 LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
100 boolean needReleaseLock = false;
101 final ListenableFuture<List<Void>> destroyFuture;
103 needReleaseLock = clusterLock.tryAcquire(10, TimeUnit.SECONDS);
104 } catch (final Exception e) {
105 LOG.warn("Unexpected Exception for service Provider {} in closing phase.",
106 clusterSingletonGroupIdentifier, e);
108 if (serviceEntityCandidateReg != null) {
109 serviceEntityCandidateReg.close();
110 serviceEntityCandidateReg = null;
112 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
114 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
115 serviceCloseFutureList.add(service.closeServiceInstance());
118 destroyFuture = Futures.allAsList(serviceCloseFutureList);
119 final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
120 Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease));
122 return destroyFuture;
125 @SuppressWarnings("checkstyle:IllegalCatch")
127 public void initializationClusterSingletonGroup() {
128 LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
129 boolean needReleaseLock = false;
130 boolean needCloseProviderInstance = false;
132 clusterLock.acquire();
133 needReleaseLock = true;
134 Verify.verify(serviceGroup.isEmpty());
135 Verify.verify(!hasOwnership);
136 Verify.verify(serviceEntityCandidateReg == null);
137 serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
138 } catch (final Exception e) {
139 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
140 needCloseProviderInstance = true;
142 closeResources(needReleaseLock, needCloseProviderInstance);
146 @SuppressWarnings("checkstyle:IllegalCatch")
148 public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
149 LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
150 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
151 boolean needReleaseLock = false;
152 boolean needCloseProviderInstance = false;
153 ClusterSingletonServiceRegistrationDelegator reg = null;
155 clusterLock.acquire();
156 needReleaseLock = true;
157 Verify.verify(serviceEntityCandidateReg != null);
158 reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
159 serviceGroup.add(reg);
161 service.instantiateServiceInstance();
163 } catch (final Exception e) {
164 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
165 needCloseProviderInstance = true;
167 closeResources(needReleaseLock, needCloseProviderInstance);
172 @SuppressWarnings("checkstyle:IllegalCatch")
174 public void unregisterService(final ClusterSingletonService service) {
175 LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
176 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
177 boolean needReleaseLock = false;
178 boolean needCloseProviderInstance = false;
180 clusterLock.acquire();
181 needReleaseLock = true;
182 serviceGroup.remove(service);
184 service.closeServiceInstance();
186 } catch (final Exception e) {
187 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
188 needCloseProviderInstance = true;
190 closeResources(needReleaseLock, needCloseProviderInstance);
191 if (serviceGroup.isEmpty()) {
192 this.closeClusterSingletonGroup();
197 @SuppressWarnings("checkstyle:IllegalCatch")
199 public void ownershipChanged(final C ownershipChange) {
200 LOG.debug("Ownership change {} for ClusterSingletonServiceGrou {}", ownershipChange,
201 clusterSingletonGroupIdentifier);
203 if (ownershipChange.inJeopardy()) {
204 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
208 if (serviceEntity.equals(ownershipChange.getEntity())) {
209 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
211 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
213 tryToTakeOwnership();
214 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
215 || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
216 .equals(ownershipChange.getState())) {
218 * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
222 /* Not needed notifications */
223 LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
224 clusterSingletonGroupIdentifier);
226 } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
227 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
229 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
232 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
233 || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
234 .equals(ownershipChange.getState())) {
236 * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
238 LOG.warn("Unexpected lost doubleCandidate {} leadership. Close service instance {}",
239 doubleCandidateEntity, clusterSingletonGroupIdentifier);
242 /* Not needed notifications */
243 LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
244 ownershipChange, clusterSingletonGroupIdentifier);
247 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
249 } catch (Exception e) {
250 LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
251 // TODO : think about close ... is it necessary?
256 * Help method to registered DoubleCandidateEntity. It is first step
257 * before the actual instance take Leadership.
259 @SuppressWarnings("checkstyle:IllegalCatch")
260 private void tryToTakeOwnership() {
261 LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
262 boolean needReleaseLock = false;
263 boolean needCloseProviderInstance = false;
265 clusterLock.acquire();
266 needReleaseLock = true;
267 Verify.verify(serviceEntityCandidateReg != null);
268 Verify.verify(asyncCloseEntityCandidateReg == null);
269 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
270 } catch (Exception e) {
271 LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
272 clusterSingletonGroupIdentifier, e);
273 needCloseProviderInstance = true;
275 closeResources(needReleaseLock, needCloseProviderInstance);
280 * Help method calls setupService method for create single cluster-wide service instance.
282 @SuppressWarnings("checkstyle:IllegalCatch")
283 private void takeOwnership() {
284 LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
285 boolean needReleaseLock = false;
286 boolean needCloseProviderInstance = false;
288 clusterLock.acquire();
289 needReleaseLock = true;
290 Verify.verify(serviceEntityCandidateReg != null);
291 Verify.verify(asyncCloseEntityCandidateReg != null);
292 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
293 service.instantiateServiceInstance();
296 } catch (final Exception e) {
297 LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
298 clusterSingletonGroupIdentifier, e);
299 needCloseProviderInstance = true;
301 closeResources(needReleaseLock, needCloseProviderInstance);
306 * Help method calls suspendService method for stop this single cluster-wide service instance.
307 * The last async. step has to close DoubleCandidateRegistration reference what should initialize
308 * new election for DoubleCandidateEntity.
310 @SuppressWarnings("checkstyle:IllegalCatch")
311 private void lostOwnership() {
312 LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
313 boolean needReleaseLock = false;
314 boolean needCloseProviderInstance = false;
316 clusterLock.acquire();
317 needReleaseLock = true;
318 Verify.verify(serviceEntityCandidateReg != null);
319 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
321 Verify.verify(asyncCloseEntityCandidateReg != null);
322 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
323 serviceCloseFutureList.add(service.closeServiceInstance());
327 final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
328 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock));
330 * We wish to stop all possible EOS activities before we don't close
331 * a close candidate registration that acts as a guard. So we don't want
332 * to release Semaphore (clusterLock) before we are not fully finished.
333 * Semaphore lock release has to be realized as FutureCallback after a service
334 * instance has fully closed prior to relinquishing service ownership.
336 needReleaseLock = false;
337 } catch (final Exception e) {
338 LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
339 clusterSingletonGroupIdentifier, e);
340 needCloseProviderInstance = true;
342 closeResources(needReleaseLock, needCloseProviderInstance);
347 * Help method for finalization every acquired functionality
349 private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
350 if (needReleaseLock) {
351 clusterLock.release();
353 if (needCloseProvider) {
354 final ListenableFuture<List<Void>> closeFutureList = this.closeClusterSingletonGroup();
355 Futures.addCallback(closeFutureList, new FutureCallback<List<Void>>() {
358 public void onSuccess(final List<Void> result) {
359 removeThisGroupFromProvider(null);
363 public void onFailure(final Throwable throwable) {
364 removeThisGroupFromProvider(throwable);
371 * Help method for final ClusterSingletonGroup removing
373 protected void removeThisGroupFromProvider(@Nullable final Throwable throwable) {
374 LOG.debug("Removing ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
375 if (throwable != null) {
376 LOG.warn("Unexpected problem by closingResources ClusterSingletonServiceGroup {}",
377 clusterSingletonGroupIdentifier);
379 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
383 * Help method creates FutureCallback for suspend Future
385 private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore) {
386 final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
387 if (throwable != null) {
388 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
390 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
392 if (asyncCloseEntityCandidateReg != null) {
393 asyncCloseEntityCandidateReg.close();
394 asyncCloseEntityCandidateReg = null;
396 if (semaphore != null) {
399 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
402 return new FutureCallback<List<Void>>() {
405 public void onSuccess(final List<Void> result) {
406 closeEntityCandidateRegistration.accept(null);
410 public void onFailure(final Throwable throwable) {
411 closeEntityCandidateRegistration.accept(throwable);