2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.singleton.dom.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Strings;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import javax.annotation.Nullable;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Implementation of {@link ClusterSingletonServiceGroup}
42 * @param <P> the instance identifier path type
43 * @param <E> the GenericEntity type
44 * @param <C> the GenericEntityOwnershipChange type
45 * @param <G> the GenericEntityOwnershipListener type
46 * @param <S> the GenericEntityOwnershipService type
49 final class ClusterSingletonServiceGroupImpl<P extends Path<P>, E extends GenericEntity<P>,
50 C extends GenericEntityOwnershipChange<P, E>,
51 G extends GenericEntityOwnershipListener<P, C>,
52 S extends GenericEntityOwnershipService<P, E, G>>
53 implements ClusterSingletonServiceGroup<P, E, C> {
55 private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName());
57 private final S entityOwnershipService;
58 private final String clusterSingletonGroupIdentifier;
59 private final Semaphore clusterLock = new Semaphore(1, true);
61 /* Entity instances */
62 private final E serviceEntity;
63 private final E doubleCandidateEntity;
65 // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
66 // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
67 // FOLLOWER : baseCandidate is registered correctly
68 // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
69 // LEADER : both candidate have mastership from EOS
70 // TERMINATED : service go down
71 @GuardedBy("clusterLock")
72 private boolean hasOwnership = false;
73 @GuardedBy("clusterLock")
74 private final List<ClusterSingletonServiceRegistrationDelegator> serviceGroup = new LinkedList<>();
75 private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups;
77 /* EOS Candidate Registrations */
78 private GenericEntityOwnershipCandidateRegistration<P, E> serviceEntityCandidateReg;
79 private GenericEntityOwnershipCandidateRegistration<P, E> asyncCloseEntityCandidateReg;
84 * @param clusterSingletonServiceGroupIdentifier not empty string as identifier
87 * @param entityOwnershipService
89 ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity,
90 final E closeEntity, final S entityOwnershipService,
91 final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> allServiceGroups) {
92 LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier);
93 Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier));
94 this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier;
95 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
96 this.serviceEntity = Preconditions.checkNotNull(mainEntity);
97 this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity);
98 this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups);
102 public final ListenableFuture<List<Void>> closeClusterSingletonGroup() {
103 LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier);
104 boolean needReleaseLock = false;
105 final ListenableFuture<List<Void>> destroyFuture;
107 needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
108 } catch (final Exception e) {
109 LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
112 if (serviceEntityCandidateReg != null) {
113 serviceEntityCandidateReg.close();
114 serviceEntityCandidateReg = null;
116 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
118 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
119 serviceCloseFutureList.add(service.closeServiceInstance());
121 hasOwnership = false;
123 destroyFuture = Futures.allAsList(serviceCloseFutureList);
124 final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
125 Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
127 return destroyFuture;
131 public final void initializationClusterSingletonGroup() {
132 LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier);
133 boolean needReleaseLock = false;
134 boolean needCloseProviderInstance = false;
136 clusterLock.acquire();
137 needReleaseLock = true;
138 Verify.verify(serviceGroup.isEmpty());
139 Verify.verify(!hasOwnership);
140 Verify.verify(serviceEntityCandidateReg == null);
141 serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
142 } catch (final Exception e) {
143 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
144 needCloseProviderInstance = true;
145 throw new RuntimeException(e);
147 closeResources(needReleaseLock, needCloseProviderInstance);
152 public final ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) {
153 LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
154 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
155 boolean needReleaseLock = false;
156 boolean needCloseProviderInstance = false;
157 ClusterSingletonServiceRegistrationDelegator reg = null;
159 clusterLock.acquire();
160 needReleaseLock = true;
161 Verify.verify(serviceEntityCandidateReg != null);
162 reg = new ClusterSingletonServiceRegistrationDelegator(service, this);
163 serviceGroup.add(reg);
165 service.instantiateServiceInstance();
167 } catch (final Exception e) {
168 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
169 needCloseProviderInstance = true;
170 throw new RuntimeException(e);
172 closeResources(needReleaseLock, needCloseProviderInstance);
178 public final void unregisterService(final ClusterSingletonService service) {
179 LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
180 Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue()));
181 boolean needReleaseLock = false;
182 boolean needCloseProviderInstance = false;
184 clusterLock.acquire();
185 needReleaseLock = true;
186 if (serviceGroup.size() > 1) {
188 service.closeServiceInstance();
190 serviceGroup.remove(service);
191 LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
193 needCloseProviderInstance = true;
195 } catch (final Exception e) {
196 LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
197 needCloseProviderInstance = true;
198 throw new RuntimeException(e);
200 closeResources(needReleaseLock, needCloseProviderInstance);
205 public void ownershipChanged(final C ownershipChange) {
206 LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
207 clusterSingletonGroupIdentifier);
209 if (ownershipChange.inJeopardy()) {
210 LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange);
214 if (serviceEntity.equals(ownershipChange.getEntity())) {
215 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
217 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
219 tryToTakeOwnership();
220 } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
221 || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
222 .equals(ownershipChange.getState())) {
224 * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
228 /* Not needed notifications */
229 LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange,
230 clusterSingletonGroupIdentifier);
232 } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) {
233 if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) {
235 * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
239 /* Not needed notifications */
240 LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
241 ownershipChange, clusterSingletonGroupIdentifier);
244 LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange);
246 } catch (final Exception e) {
247 LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e);
248 // TODO : think about close ... is it necessary?
253 * Help method to registered DoubleCandidateEntity. It is first step
254 * before the actual instance take Leadership.
256 private void tryToTakeOwnership() {
257 LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
258 boolean needReleaseLock = false;
259 boolean needCloseProviderInstance = false;
261 clusterLock.acquire();
262 needReleaseLock = true;
263 if (serviceEntityCandidateReg != null) {
264 Verify.verify(asyncCloseEntityCandidateReg == null);
265 asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
267 LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
269 } catch (final Exception e) {
270 LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
271 clusterSingletonGroupIdentifier, e);
272 needCloseProviderInstance = true;
274 closeResources(needReleaseLock, needCloseProviderInstance);
279 * Help method calls setupService method for create single cluster-wide service instance.
281 private void takeOwnership() {
282 LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
283 boolean needReleaseLock = false;
284 boolean needCloseProviderInstance = false;
286 clusterLock.acquire();
287 needReleaseLock = true;
288 if (serviceEntityCandidateReg != null) {
289 Verify.verify(asyncCloseEntityCandidateReg != null);
290 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
291 service.instantiateServiceInstance();
295 LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
297 } catch (final Exception e) {
298 LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
299 clusterSingletonGroupIdentifier, e);
300 needCloseProviderInstance = true;
302 closeResources(needReleaseLock, needCloseProviderInstance);
307 * Help method calls suspendService method for stop this single cluster-wide service instance.
308 * The last async. step has to close DoubleCandidateRegistration reference what should initialize
309 * new election for DoubleCandidateEntity.
311 private void lostOwnership() {
312 LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier);
313 boolean needReleaseLock = false;
314 boolean needCloseProviderInstance = false;
316 clusterLock.acquire();
317 needReleaseLock = true;
318 final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
320 Verify.verify(asyncCloseEntityCandidateReg != null);
321 for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
322 serviceCloseFutureList.add(service.closeServiceInstance());
324 hasOwnership = false;
327 final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
328 if (serviceEntityCandidateReg != null) {
329 // we don't want to remove this instance from map
330 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
332 // we have to remove this ClusterSingletonServiceGroup instance from map
333 Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
336 * We wish to stop all possible EOS activities before we don't close
337 * a close candidate registration that acts as a guard. So we don't want
338 * to release Semaphore (clusterLock) before we are not fully finished.
339 * Semaphore lock release has to be realized as FutureCallback after a service
340 * instance has fully closed prior to relinquishing service ownership.
342 needReleaseLock = false;
343 } catch (final Exception e) {
344 LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
345 clusterSingletonGroupIdentifier, e);
346 needCloseProviderInstance = true;
348 closeResources(needReleaseLock, needCloseProviderInstance);
353 * Help method for finalization every acquired functionality
355 @GuardedBy("clusterLock")
356 private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
357 if (needCloseProvider) {
358 // The Game Over for this ClusterSingletonServiceGroup instance
359 if (serviceEntityCandidateReg != null) {
360 serviceEntityCandidateReg.close();
361 serviceEntityCandidateReg = null;
363 // Remove instance immediately because actual state is follower or initialization
364 if (asyncCloseEntityCandidateReg == null) {
365 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
369 if (needReleaseLock) {
370 clusterLock.release();
375 * Help method creates FutureCallback for suspend Future
377 private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
378 final boolean isInCloseProcess) {
379 final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
380 if (throwable != null) {
381 LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
383 LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
385 if (asyncCloseEntityCandidateReg != null) {
386 asyncCloseEntityCandidateReg.close();
387 asyncCloseEntityCandidateReg = null;
389 if (isInCloseProcess) {
390 allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
392 if (semaphore != null) {
397 return new FutureCallback<List<Void>>() {
400 public void onSuccess(final List<Void> result) {
401 closeEntityCandidateRegistration.accept(null);
405 public void onFailure(final Throwable t) {
406 closeEntityCandidateRegistration.accept(t);