2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.yangtools.concepts.Path;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Implementation of {@link ClusterSingletonServiceGroup}.
43 * @param <P> the instance identifier path type
44 * @param <E> the GenericEntity type
45 * @param <C> the GenericEntityOwnershipChange type
46 * @param <G> the GenericEntityOwnershipListener type
47 * @param <S> the GenericEntityOwnershipService type
50 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
51 C extends GenericEntityOwnershipChange<P, E>,
52 G extends GenericEntityOwnershipListener<P, C>,
53 S extends GenericEntityOwnershipService<P, E, G>>
54 implements ClusterSingletonServiceGroup<P, E, C> {
56 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
58 private final S entityOwnershipService;
59 private final String clusterSingletonGroupIdentifier;
60 private final Semaphore clusterLock = new Semaphore(1, true);
62 /* Entity instances */
63 private final E serviceEntity;
64 private final E doubleCandidateEntity;
66 // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
67 // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
68 // FOLLOWER : baseCandidate is registered correctly
69 // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
70 // LEADER : both candidate have mastership from EOS
71 // TERMINATED : service go down
72 @GuardedBy("clusterLock")
73 private boolean hasOwnership = false;
74 @GuardedBy("clusterLock")
75 private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
76 private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
78 /* EOS Candidate Registrations */
79 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
80 private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
85 * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
86 * @param mainEntity as Entity instance
87 * @param closeEntity as Entity instance
88 * @param entityOwnershipService GenericEntityOwnershipService instance
89 * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type
91 ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
92 final E closeEntity, final S entityOwnershipService,
93 final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
94 LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
95 Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
96 this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
97 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
98 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
99 this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
100 this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
103 @SuppressWarnings("checkstyle:IllegalCatch")
105 public ListenableFuture<List<Void>> closeClusterSingletonGroup() {
106 LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
107 boolean needReleaseLock = false;
108 final ListenableFuture<List<Void>> destroyFuture;
110 needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
111 } catch (final Exception e) {
112 LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
115 if (serviceEntityCandidateReg != null) {
116 serviceEntityCandidateReg.close();
117 serviceEntityCandidateReg = null;
119 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
121 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
123 serviceCloseFutureList.add(service.closeServiceInstance());
124 } catch (final RuntimeException e) {
125 LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
126 service.getIdentifier());
129 hasOwnership = false;
131 destroyFuture = Futures.allAsList(serviceCloseFutureList);
132 final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
133 Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
135 return destroyFuture;
138 @SuppressWarnings("checkstyle:IllegalCatch")
140 public void initializationClusterSingletonGroup() {
141 LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
142 boolean needReleaseLock = false;
143 boolean needCloseProviderInstance = false;
145 clusterLock.acquire();
146 needReleaseLock = true;
147 Verify.verify(serviceGroup.isEmpty());
148 Verify.verify(!hasOwnership);
149 Verify.verify(serviceEntityCandidateReg == null);
150 serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
151 } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
152 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
153 needCloseProviderInstance = true;
154 throw new RuntimeException(e);
156 closeResources(needReleaseLock, needCloseProviderInstance);
160 @SuppressWarnings("checkstyle:IllegalCatch")
162 public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
163 LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
164 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
165 boolean needReleaseLock = false;
166 boolean needCloseProviderInstance = false;
167 ClusterSingletonServiceRegistrationDelegator reg = null;
169 clusterLock.acquire();
170 needReleaseLock = true;
171 Verify.verify(serviceEntityCandidateReg != null);
172 reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
173 serviceGroup.add(reg);
175 service.instantiateServiceInstance();
177 } catch (final RuntimeException | InterruptedException e) {
178 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
179 needCloseProviderInstance = true;
180 throw new RuntimeException(e);
182 closeResources(needReleaseLock, needCloseProviderInstance);
187 @SuppressWarnings("checkstyle:IllegalCatch")
189 public void unregisterService(final ClusterSingletonService service) {
190 LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
191 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
192 boolean needReleaseLock = false;
193 boolean needCloseProviderInstance = false;
195 clusterLock.acquire();
196 needReleaseLock = true;
197 if (serviceGroup.size() > 1) {
199 service.closeServiceInstance();
201 serviceGroup.remove(service);
202 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
204 needCloseProviderInstance = true;
206 } catch (final RuntimeException | InterruptedException e) {
207 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
208 needCloseProviderInstance = true;
209 throw new RuntimeException(e);
211 closeResources(needReleaseLock, needCloseProviderInstance);
215 @SuppressWarnings("checkstyle:IllegalCatch")
217 public void ownershipChanged(final C ownershipChange) {
218 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
219 clusterSingletonGroupIdentifier);
221 if (ownershipChange.inJeopardy()) {
222 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
226 if (serviceEntity.equals(ownershipChange.getEntity())) {
227 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
229 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
231 tryToTakeOwnership();
232 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
233 || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
234 .equals(ownershipChange.getState())) {
236 * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
240 /* Not needed notifications */
241 LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
242 clusterSingletonGroupIdentifier);
244 } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
245 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
247 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
251 /* Not needed notifications */
252 LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
253 ownershipChange, clusterSingletonGroupIdentifier);
256 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
258 } catch (final Exception e) {
259 LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
260 // TODO : think about close ... is it necessary?
265 * Help method to registered DoubleCandidateEntity. It is first step
266 * before the actual instance take Leadership.
268 @SuppressWarnings("checkstyle:IllegalCatch")
269 private void tryToTakeOwnership() {
270 LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
271 boolean needReleaseLock = false;
272 boolean needCloseProviderInstance = false;
274 clusterLock.acquire();
275 needReleaseLock = true;
276 if (serviceEntityCandidateReg != null) {
277 Verify.verify(asyncCloseEntityCandidateReg == null);
278 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
280 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
282 } catch (final Exception e) {
283 LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
284 clusterSingletonGroupIdentifier, e);
285 needCloseProviderInstance = true;
287 closeResources(needReleaseLock, needCloseProviderInstance);
292 * Help method calls setupService method for create single cluster-wide service instance.
294 @SuppressWarnings("checkstyle:IllegalCatch")
295 private void takeOwnership() {
296 LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
297 boolean needReleaseLock = false;
298 boolean needCloseProviderInstance = false;
300 clusterLock.acquire();
301 needReleaseLock = true;
302 if (serviceEntityCandidateReg != null) {
303 Verify.verify(asyncCloseEntityCandidateReg != null);
304 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
305 service.instantiateServiceInstance();
309 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
311 } catch (final RuntimeException | InterruptedException e) {
312 LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
313 clusterSingletonGroupIdentifier, e);
314 needCloseProviderInstance = true;
316 closeResources(needReleaseLock, needCloseProviderInstance);
321 * Help method calls suspendService method for stop this single cluster-wide service instance.
322 * The last async. step has to close DoubleCandidateRegistration reference what should initialize
323 * new election for DoubleCandidateEntity.
325 @SuppressWarnings("checkstyle:IllegalCatch")
326 private void lostOwnership() {
327 LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
328 boolean needReleaseLock = false;
329 boolean needCloseProviderInstance = false;
331 clusterLock.acquire();
332 needReleaseLock = true;
333 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
335 Verify.verify(asyncCloseEntityCandidateReg != null);
336 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
338 serviceCloseFutureList.add(service.closeServiceInstance());
339 } catch (final RuntimeException e) {
340 LOG.error("Unexpected exception while closing service: {}, resuming with next..",
341 service.getIdentifier());
344 hasOwnership = false;
347 final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
348 if (serviceEntityCandidateReg != null) {
349 // we don't want to remove this instance from map
350 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
352 // we have to remove this ClusterSingletonServiceGroup instance from map
353 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
356 * We wish to stop all possible EOS activities before we don't close
357 * a close candidate registration that acts as a guard. So we don't want
358 * to release Semaphore (clusterLock) before we are not fully finished.
359 * Semaphore lock release has to be realized as FutureCallback after a service
360 * instance has fully closed prior to relinquishing service ownership.
362 needReleaseLock = false;
363 } catch (final InterruptedException e) {
364 LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
365 clusterSingletonGroupIdentifier, e);
366 needCloseProviderInstance = true;
368 closeResources(needReleaseLock, needCloseProviderInstance);
373 * Help method for finalization every acquired functionality
375 @GuardedBy("clusterLock")
376 private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
377 if (needCloseProvider) {
378 // The Game Over for this ClusterSingletonServiceGroup instance
379 if (serviceEntityCandidateReg != null) {
380 serviceEntityCandidateReg.close();
381 serviceEntityCandidateReg = null;
383 // Remove instance immediately because actual state is follower or initialization
384 if (asyncCloseEntityCandidateReg == null) {
385 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
389 if (needReleaseLock) {
390 clusterLock.release();
395 * Help method creates FutureCallback for suspend Future
397 private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
398 final boolean isInCloseProcess) {
399 final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
400 if (throwable != null) {
401 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
403 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
405 if (asyncCloseEntityCandidateReg != null) {
406 asyncCloseEntityCandidateReg.close();
407 asyncCloseEntityCandidateReg = null;
409 if (isInCloseProcess) {
410 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
412 if (semaphore != null) {
417 return new FutureCallback<List<Void>>() {
420 public void onSuccess(final List<Void> result) {
421 closeEntityCandidateRegistration.accept(null);
425 public void onFailure(final Throwable throwable) {
426 closeEntityCandidateRegistration.accept(throwable);