import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipCandidateRegistration;
private final S entityOwnershipService;
private final String clusterSingletonGroupIdentifier;
- private final Semaphore clusterLock = new Semaphore(1);
+ private final Semaphore clusterLock = new Semaphore(1, true);
/* Entity instances */
private final E serviceEntity;
private final E doubleCandidateEntity;
+ // TODO :it needs to rewrite for StateMachine (INITIALIZED, TRY_TO_TAKE_LEADERSHIP, LEADER, FOLLOWER, TERMINATED)
+ // INITIALIZED : we have registered baseCandidate and we are waiting for first EOS response (!do we really need it?)
+ // FOLLOWER : baseCandidate is registered correctly
+ // TRY_TO_TAKE_LEADERSHIP : guardCandidate is registered correctly
+ // LEADER : both candidate have mastership from EOS
+ // TERMINATED : service go down
@GuardedBy("clusterLock")
private boolean hasOwnership = false;
@GuardedBy("clusterLock")
boolean needReleaseLock = false;
final ListenableFuture<List<Void>> destroyFuture;
try {
- needReleaseLock = clusterLock.tryAcquire(10, TimeUnit.SECONDS);
+ needReleaseLock = clusterLock.tryAcquire(1, TimeUnit.SECONDS);
} catch (final Exception e) {
- LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier, e);
+ LOG.warn("Unexpected Exception for service Provider {} in closing phase.", clusterSingletonGroupIdentifier,
+ e);
} finally {
if (serviceEntityCandidateReg != null) {
serviceEntityCandidateReg.close();
final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
if (hasOwnership) {
for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- serviceCloseFutureList.add(service.closeServiceInstance());
+ try {
+ serviceCloseFutureList.add(service.closeServiceInstance());
+ } catch (final RuntimeException e) {
+ LOG.warn("Unexpected exception while closing service: {}, resuming with next..",
+ service.getIdentifier());
+ }
}
+ hasOwnership = false;
}
destroyFuture = Futures.allAsList(serviceCloseFutureList);
final Semaphore finalRelease = needReleaseLock ? clusterLock : null;
- Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease));
+ Futures.addCallback(destroyFuture, newAsyncCloseCallback(finalRelease, true));
}
return destroyFuture;
}
Verify.verify(!hasOwnership);
Verify.verify(serviceEntityCandidateReg == null);
serviceEntityCandidateReg = entityOwnershipService.registerCandidate(serviceEntity);
- } catch (final Exception e) {
+ } catch (final RuntimeException | InterruptedException | CandidateAlreadyRegisteredException e) {
LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
needCloseProviderInstance = true;
+ throw new RuntimeException(e);
} finally {
closeResources(needReleaseLock, needCloseProviderInstance);
}
if (hasOwnership) {
service.instantiateServiceInstance();
}
- } catch (final Exception e) {
+ } catch (final RuntimeException | InterruptedException e) {
LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
needCloseProviderInstance = true;
+ throw new RuntimeException(e);
} finally {
closeResources(needReleaseLock, needCloseProviderInstance);
}
try {
clusterLock.acquire();
needReleaseLock = true;
- serviceGroup.remove(service);
- if (hasOwnership) {
- service.closeServiceInstance();
+ if (serviceGroup.size() > 1) {
+ if (hasOwnership) {
+ service.closeServiceInstance();
+ }
+ serviceGroup.remove(service);
+ LOG.debug("Service {} was removed from group.", service.getIdentifier().getValue());
+ } else {
+ needCloseProviderInstance = true;
}
- } catch (final Exception e) {
+ } catch (final RuntimeException | InterruptedException e) {
LOG.debug("Unexpected error by registration service Provider {}", clusterSingletonGroupIdentifier, e);
needCloseProviderInstance = true;
+ throw new RuntimeException(e);
} finally {
closeResources(needReleaseLock, needCloseProviderInstance);
- if (serviceGroup.isEmpty()) {
- this.closeClusterSingletonGroup();
- }
}
}
@Override
- public final void ownershipChanged(final C ownershipChange) {
- LOG.debug("Ownership change {} for ClusterSingletonServiceGrou {}", ownershipChange,
+ public void ownershipChanged(final C ownershipChange) {
+ LOG.debug("Ownership change {} for ClusterSingletonServiceGroup {}", ownershipChange,
clusterSingletonGroupIdentifier);
try {
if (ownershipChange.inJeopardy()) {
* SLAVE to MASTER : ownershipChange.getState().isOwner() && !ownershipChange.getState().wasOwner()
*/
takeOwnership();
- } else if (EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NEW_OWNER.equals(ownershipChange.getState())
- || EntityOwnershipChangeState.LOCAL_OWNERSHIP_LOST_NO_OWNER
- .equals(ownershipChange.getState())) {
- /*
- * MASTER to SLAVE : !ownershipChange.getState().isOwner() && ownershipChange.getState().wasOwner()
- */
- LOG.warn("Unexpected lost doubleCandidate {} leadership. Close service instance {}",
- doubleCandidateEntity, clusterSingletonGroupIdentifier);
- lostOwnership();
} else {
/* Not needed notifications */
LOG.debug("Not processed doubleCandidate OwnershipChange {} in service Provider {}",
try {
clusterLock.acquire();
needReleaseLock = true;
- Verify.verify(serviceEntityCandidateReg != null);
- Verify.verify(asyncCloseEntityCandidateReg == null);
- asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
+ if (serviceEntityCandidateReg != null) {
+ Verify.verify(asyncCloseEntityCandidateReg == null);
+ asyncCloseEntityCandidateReg = entityOwnershipService.registerCandidate(doubleCandidateEntity);
+ } else {
+ LOG.debug("Service {} is closed, so don't to tryTakeLeadership", clusterSingletonGroupIdentifier);
+ }
} catch (final Exception e) {
LOG.error("Unexpected exception state for service Provider {} in TryToTakeLeadership",
clusterSingletonGroupIdentifier, e);
try {
clusterLock.acquire();
needReleaseLock = true;
- Verify.verify(serviceEntityCandidateReg != null);
- Verify.verify(asyncCloseEntityCandidateReg != null);
- for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- service.instantiateServiceInstance();
+ if (serviceEntityCandidateReg != null) {
+ Verify.verify(asyncCloseEntityCandidateReg != null);
+ for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
+ service.instantiateServiceInstance();
+ }
+ hasOwnership = true;
+ } else {
+ LOG.debug("Service {} is closed, so don't take leadership", clusterSingletonGroupIdentifier);
}
- hasOwnership = true;
- } catch (final Exception e) {
+ } catch (final RuntimeException | InterruptedException e) {
LOG.error("Unexpected exception state for service Provider {} in TakeLeadership",
clusterSingletonGroupIdentifier, e);
needCloseProviderInstance = true;
try {
clusterLock.acquire();
needReleaseLock = true;
- Verify.verify(serviceEntityCandidateReg != null);
final List<ListenableFuture<Void>> serviceCloseFutureList = new ArrayList<>();
if (hasOwnership) {
Verify.verify(asyncCloseEntityCandidateReg != null);
for (final ClusterSingletonServiceRegistrationDelegator service : serviceGroup) {
- serviceCloseFutureList.add(service.closeServiceInstance());
+ try {
+ serviceCloseFutureList.add(service.closeServiceInstance());
+ } catch (final RuntimeException e) {
+ LOG.error("Unexpected exception while closing service: {}, resuming with next..",
+ service.getIdentifier());
+ }
}
+ hasOwnership = false;
}
final ListenableFuture<List<Void>> destroyFuture = Futures.allAsList(serviceCloseFutureList);
- Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock));
+ if (serviceEntityCandidateReg != null) {
+ // we don't want to remove this instance from map
+ Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, false));
+ } else {
+ // we have to remove this ClusterSingletonServiceGroup instance from map
+ Futures.addCallback(destroyFuture, newAsyncCloseCallback(clusterLock, true));
+ }
/*
* We wish to stop all possible EOS activities before we don't close
* a close candidate registration that acts as a guard. So we don't want
* instance has fully closed prior to relinquishing service ownership.
*/
needReleaseLock = false;
- } catch (final Exception e) {
+ } catch (final InterruptedException e) {
LOG.error("Unexpected exception state for service Provider {} in LostLeadership",
clusterSingletonGroupIdentifier, e);
needCloseProviderInstance = true;
/*
* Help method for finalization every acquired functionality
*/
+ @GuardedBy("clusterLock")
private void closeResources(final boolean needReleaseLock, final boolean needCloseProvider) {
- if (needReleaseLock) {
- clusterLock.release();
- }
if (needCloseProvider) {
- final ListenableFuture<List<Void>> closeFutureList = this.closeClusterSingletonGroup();
- Futures.addCallback(closeFutureList, new FutureCallback<List<Void>>() {
-
- @Override
- public void onSuccess(final List<Void> result) {
- removeThisGroupFromProvider(null);
- }
-
- @Override
- public void onFailure(final Throwable t) {
- removeThisGroupFromProvider(t);
- }
- });
+ // The Game Over for this ClusterSingletonServiceGroup instance
+ if (serviceEntityCandidateReg != null) {
+ serviceEntityCandidateReg.close();
+ serviceEntityCandidateReg = null;
+ }
+ // Remove instance immediately because actual state is follower or initialization
+ if (asyncCloseEntityCandidateReg == null) {
+ allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
+ }
}
- }
- /*
- * Help method for final ClusterSingletonGroup removing
- */
- protected final void removeThisGroupFromProvider(@Nullable final Throwable t) {
- LOG.debug("Removing ClusterSingletonServiceGroup {}", clusterSingletonGroupIdentifier);
- if (t != null) {
- LOG.warn("Unexpected problem by closingResources ClusterSingletonServiceGroup {}",
- clusterSingletonGroupIdentifier);
+ if (needReleaseLock) {
+ clusterLock.release();
}
- allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
}
/*
* Help method creates FutureCallback for suspend Future
*/
- private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore) {
- final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable t) -> {
- if (t != null) {
- LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, t);
+ private FutureCallback<List<Void>> newAsyncCloseCallback(@Nullable final Semaphore semaphore,
+ final boolean isInCloseProcess) {
+ final Consumer<Throwable> closeEntityCandidateRegistration = (@Nullable final Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Unexpected error closing service instance {}", clusterSingletonGroupIdentifier, throwable);
} else {
LOG.debug("Destroy service Instance {} is success", clusterSingletonGroupIdentifier);
}
asyncCloseEntityCandidateReg.close();
asyncCloseEntityCandidateReg = null;
}
+ if (isInCloseProcess) {
+ allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
+ }
if (semaphore != null) {
semaphore.release();
}
- allServiceGroups.remove(clusterSingletonGroupIdentifier, this);
};
return new FutureCallback<List<Void>>() {