/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.mdsal.singleton.dom.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState; import org.opendaylight.mdsal.eos.common.api.GenericEntity; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yangtools.concepts.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Implementation of {@link ClusterSingletonServiceGroup}. * * @param

the instance identifier path type * @param the GenericEntity type * @param the GenericEntityOwnershipChange type * @param the GenericEntityOwnershipListener type * @param the GenericEntityOwnershipService type */ @VisibleForTesting final class ClusterSingletonServiceGroupImpl

, E extends GenericEntity

, C extends GenericEntityOwnershipChange, G extends GenericEntityOwnershipListener, S extends GenericEntityOwnershipService> implements ClusterSingletonServiceGroup { private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonServiceGroupImpl.class.getName()); private final S entityOwnershipService; private final String clusterSingletonGroupIdentifier; private final Semaphore clusterLock = new Semaphore(1, true); /* Entity instances */ private final E serviceEntity; private final E doubleCandidateEntity; // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED) // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?) // FOLLOWER : baseCandidate is registered correctly // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly // LEADER : both candidate have mastership from EOS // TERMINATED : service go down @GuardedBy("clusterLock") private boolean hasOwnership = false; @GuardedBy("clusterLock") private final List serviceGroup = new LinkedList<>(); private final ConcurrentMap> allServiceGroups; /* EOS Candidate Registrations */ private GenericEntityOwnershipCandidateRegistration serviceEntityCandidateReg; private GenericEntityOwnershipCandidateRegistration asyncCloseEntityCandidateReg; /** * Class constructor. * * @param clusterSingletonServiceGroupIdentifier not empty string as identifier * @param mainEntity as Entity instance * @param closeEntity as Entity instance * @param entityOwnershipService GenericEntityOwnershipService instance * @param allServiceGroups concurrentMap of String and ClusterSingletonServiceGroup type */ ClusterSingletonServiceGroupImpl(final String clusterSingletonServiceGroupIdentifier, final E mainEntity, final E closeEntity, final S entityOwnershipService, final ConcurrentMap> allServiceGroups) { LOG.debug("New Instance of ClusterSingletonServiceGroup {}", clusterSingletonServiceGroupIdentifier); Preconditions.checkArgument(!Strings.isNullOrEmpty(clusterSingletonServiceGroupIdentifier)); this.clusterSingletonGroupIdentifier = clusterSingletonServiceGroupIdentifier; this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService); this.serviceEntity = Preconditions.checkNotNull(mainEntity); this.doubleCandidateEntity = Preconditions.checkNotNull(closeEntity); this.allServiceGroups = Preconditions.checkNotNull(allServiceGroups); } @SuppressWarnings("checkstyle:IllegalCatch") @Override public ListenableFuture> closeClusterSingletonGroup() { LOG.debug("Close method for service Provider {}", clusterSingletonGroupIdentifier); boolean needReleaseLock = false; final ListenableFuture> destroyFuture; try { needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS); } catch (final Exception e) { LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier, e); } finally { if (serviceEntityCandidateReg != null) { serviceEntityCandidateReg.close(); serviceEntityCandidateReg = null; } final List> serviceCloseFutureList = new ArrayList<>(); if (hasOwnership) { for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) { serviceCloseFutureList.add(service.closeServiceInstance()); } hasOwnership = false; } destroyFuture = Futures.allAsList(serviceCloseFutureList); final Semaphore finalRelease = needReleaseLock ? clusterLock : null; Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true)); } return destroyFuture; } @SuppressWarnings("checkstyle:IllegalCatch") @Override public void initializationClusterSingletonGroup() { LOG.debug("Initialization ClusterSingletonGroup {}", clusterSingletonGroupIdentifier); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; try { clusterLock.acquire(); needReleaseLock = true; Verify.verify(serviceGroup.isEmpty()); Verify.verify(!hasOwnership); Verify.verify(serviceEntityCandidateReg == null); serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity); } catch (final Exception e) { LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; throw new RuntimeException(e); } finally { closeResources(needReleaseLock, needCloseProviderInstance); } } @SuppressWarnings("checkstyle:IllegalCatch") @Override public ClusterSingletonServiceRegistration registerService(final ClusterSingletonService service) { LOG.debug("RegisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier); Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue())); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; ClusterSingletonServiceRegistrationDelegator reg = null; try { clusterLock.acquire(); needReleaseLock = true; Verify.verify(serviceEntityCandidateReg != null); reg = new ClusterSingletonServiceRegistrationDelegator(service, this); serviceGroup.add(reg); if (hasOwnership) { service.instantiateServiceInstance(); } } catch (final Exception e) { LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; throw new RuntimeException(e); } finally { closeResources(needReleaseLock, needCloseProviderInstance); } return reg; } @SuppressWarnings("checkstyle:IllegalCatch") @Override public void unregisterService(final ClusterSingletonService service) { LOG.debug("UnregisterService method call for ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier); Verify.verify(clusterSingletonGroupIdentifier.equals(service.getIdentifier().getValue())); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; try { clusterLock.acquire(); needReleaseLock = true; if (serviceGroup.size() > 1) { if (hasOwnership) { service.closeServiceInstance(); } serviceGroup.remove(service); LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue()); } else { needCloseProviderInstance = true; } } catch (final Exception e) { LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; throw new RuntimeException(e); } finally { closeResources(needReleaseLock, needCloseProviderInstance); } } @SuppressWarnings("checkstyle:IllegalCatch") @Override public void ownershipChanged(final C ownershipChange) { LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange, clusterSingletonGroupIdentifier); try { if (ownershipChange.inJeopardy()) { LOG.warn("Cluster Node lost connection to another cluster nodes {}", ownershipChange); lostOwnership(); return; } if (serviceEntity.equals(ownershipChange.getEntity())) { if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) { /* * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner() */ tryToTakeOwnership(); } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState()) || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER .equals(ownershipChange.getState())) { /* * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner() */ lostOwnership(); } else { /* Not needed notifications */ LOG.debug("Not processed entity OwnershipChange {} in service Provider {}", ownershipChange, clusterSingletonGroupIdentifier); } } else if (doubleCandidateEntity.equals(ownershipChange.getEntity())) { if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_GRANTED.equals(ownershipChange.getState())) { /* * SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner() */ takeOwnership(); } else { /* Not needed notifications */ LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}", ownershipChange, clusterSingletonGroupIdentifier); } } else { LOG.warn("Unexpected EntityOwnershipChangeEvent for entity {}", ownershipChange); } } catch (final Exception e) { LOG.error("Unexpected Exception for service Provider {}", clusterSingletonGroupIdentifier, e); // TODO : think about close ... is it necessary? } } /* * Help method to registered DoubleCandidateEntity. It is first step * before the actual instance take Leadership. */ @SuppressWarnings("checkstyle:IllegalCatch") private void tryToTakeOwnership() { LOG.debug("TryToTakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; try { clusterLock.acquire(); needReleaseLock = true; if (serviceEntityCandidateReg != null) { Verify.verify(asyncCloseEntityCandidateReg == null); asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity); } else { LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier); } } catch (final Exception e) { LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; } finally { closeResources(needReleaseLock, needCloseProviderInstance); } } /* * Help method calls setupService method for create single cluster-wide service instance. */ @SuppressWarnings("checkstyle:IllegalCatch") private void takeOwnership() { LOG.debug("TakeLeadership method for service Provider {}", clusterSingletonGroupIdentifier); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; try { clusterLock.acquire(); needReleaseLock = true; if (serviceEntityCandidateReg != null) { Verify.verify(asyncCloseEntityCandidateReg != null); for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) { service.instantiateServiceInstance(); } hasOwnership = true; } else { LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier); } } catch (final Exception e) { LOG.error("Unexpected exception state for service Provider {} in TakeLeadership", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; } finally { closeResources(needReleaseLock, needCloseProviderInstance); } } /* * Help method calls suspendService method for stop this single cluster-wide service instance. * The last async. step has to close DoubleCandidateRegistration reference what should initialize * new election for DoubleCandidateEntity. */ @SuppressWarnings("checkstyle:IllegalCatch") private void lostOwnership() { LOG.debug("LostLeadership method for service Provider {}", clusterSingletonGroupIdentifier); boolean needReleaseLock = false; boolean needCloseProviderInstance = false; try { clusterLock.acquire(); needReleaseLock = true; final List> serviceCloseFutureList = new ArrayList<>(); if (hasOwnership) { Verify.verify(asyncCloseEntityCandidateReg != null); for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) { serviceCloseFutureList.add(service.closeServiceInstance()); } hasOwnership = false; } final ListenableFuture> destroyFuture = Futures.allAsList(serviceCloseFutureList); if (serviceEntityCandidateReg != null) { // we don't want to remove this instance from map Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false)); } else { // we have to remove this ClusterSingletonServiceGroup instance from map Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true)); } /* * We wish to stop all possible EOS activities before we don't close * a close candidate registration that acts as a guard. So we don't want * to release Semaphore (clusterLock) before we are not fully finished. * Semaphore lock release has to be realized as FutureCallback after a service * instance has fully closed prior to relinquishing service ownership. */ needReleaseLock = false; } catch (final Exception e) { LOG.error("Unexpected exception state for service Provider {} in LostLeadership", clusterSingletonGroupIdentifier, e); needCloseProviderInstance = true; } finally { closeResources(needReleaseLock, needCloseProviderInstance); } } /* * Help method for finalization every acquired functionality */ @GuardedBy("clusterLock") private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) { if (needCloseProvider) { // The Game Over for this ClusterSingletonServiceGroup instance if (serviceEntityCandidateReg != null) { serviceEntityCandidateReg.close(); serviceEntityCandidateReg = null; } // Remove instance immediately because actual state is follower or initialization if (asyncCloseEntityCandidateReg == null) { allServiceGroups.remove(clusterSingletonGroupIdentifier, this); } } if (needReleaseLock) { clusterLock.release(); } } /* * Help method creates FutureCallback for suspend Future */ private FutureCallback> newAsyncCloseCallback(@Nullable final Semaphore semaphore, final boolean isInCloseProcess) { final Consumer closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> { if (throwable != null) { LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable); } else { LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier); } if (asyncCloseEntityCandidateReg != null) { asyncCloseEntityCandidateReg.close(); asyncCloseEntityCandidateReg = null; } if (isInCloseProcess) { allServiceGroups.remove(clusterSingletonGroupIdentifier, this); } if (semaphore != null) { semaphore.release(); } }; return new FutureCallback>() { @Override public void onSuccess(final List result) { closeEntityCandidateRegistration.accept(null); } @Override public void onFailure(final Throwable throwable) { closeEntityCandidateRegistration.accept(throwable); } }; } }