import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
public Future<RpcResult<Void>> lock(LockInput input) {
String lockName = input.getLockName();
String owner = lockManagerUtils.getUniqueID();
- LOG.debug("Locking {}, owner {}" , lockName, owner);
- InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
- Lock lockData = lockManagerUtils.buildLock(lockName, owner);
- try {
- getLock(lockInstanceIdentifier, lockData);
- RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
- LOG.debug("Acquired lock {} by owner {}" , lockName, owner);
- return Futures.immediateFuture(lockRpcBuilder.build());
- } catch (InterruptedException e) {
- RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
- LOG.error("Failed to get lock {} for {}", lockName, owner, e);
- return Futures.immediateFuture(lockRpcBuilder.build());
- }
+ return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
+ InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
+ Lock lockData = lockManagerUtils.buildLock(lockName, owner);
+ return getLock(lockInstanceIdentifier, lockData);
+ }).build();
}
@Override
/**
* Try to acquire lock indefinitely until it is successful.
*/
- private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
+ private ListenableFuture<Void> getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
throws InterruptedException {
// Count from 1 to provide human-comprehensible messages
String lockName = lockData.getLockName();
try {
lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
if (readWriteLock(lockInstanceIdentifier, lockData)) {
- return;
+ return Futures.immediateFuture(null);
} else {
if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
LOG.debug("Already locked for {} after waiting {}ms, try {}",
}
} catch (ExecutionException e) {
logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
+ if (!(e.getCause() instanceof OptimisticLockFailedException)) {
+ return Futures.immediateFailedFuture(e.getCause());
+ }
}
CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
if (future != null) {
private void logUnlessCauseIsOptimisticLockFailedException(String name, int retry, ExecutionException exception) {
// Log anything else than OptimisticLockFailedException with level error.
// Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
- // and this class is explicitly designed to retry obtained a lock in case of an
+ // and this class is explicitly designed to retry obtaining a lock in case of an
// OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
// TODO This class may be completely reviewed in the future to work entirely differently;
// e.g. using an EntityOwnershipService, as proposed in Bug 8224.
*/
package org.opendaylight.genius.lockmanager.tests;
-import static com.google.common.truth.Truth.assertThat;
+import static org.opendaylight.genius.infra.testutils.TestFutureRpcResults.assertRpcErrorCause;
+import static org.opendaylight.genius.infra.testutils.TestFutureRpcResults.assertRpcErrorWithoutCausesOrMessages;
+import static org.opendaylight.genius.infra.testutils.TestFutureRpcResults.assertVoidRpcSuccess;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.junit.Rule;
import org.junit.rules.MethodRule;
import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.genius.datastoreutils.testutils.DataBrokerFailures;
import org.opendaylight.genius.datastoreutils.testutils.DataBrokerFailuresModule;
import org.opendaylight.genius.lockmanager.impl.LockManagerServiceImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Test
public void testLockAndUnLock() throws InterruptedException, ExecutionException, TimeoutException {
LockInput lockInput = new LockInputBuilder().setLockName("testLock").build();
- assertSuccessfulFutureRpcResult(lockManager.lock(lockInput));
+ assertVoidRpcSuccess(lockManager.lock(lockInput));
UnlockInput unlockInput = new UnlockInputBuilder().setLockName("testLock").build();
- assertSuccessfulFutureRpcResult(lockManager.unlock(unlockInput));
+ assertVoidRpcSuccess(lockManager.unlock(unlockInput));
}
@Test
public void testUnLockOfUnknownShouldNotFail() throws InterruptedException, ExecutionException, TimeoutException {
UnlockInput unlockInput = new UnlockInputBuilder().setLockName("unknownLock").build();
- assertSuccessfulFutureRpcResult(lockManager.unlock(unlockInput));
+ assertVoidRpcSuccess(lockManager.unlock(unlockInput));
}
@Test
// lock() RPC will infinitely retry, and it will only come out when the key is unlocked
public void testLockAndReLockSameAgain() throws InterruptedException, ExecutionException, TimeoutException {
LockInput lockInput = new LockInputBuilder().setLockName("testLock").build();
- assertSuccessfulFutureRpcResult(lockManager.lock(lockInput));
+ assertVoidRpcSuccess(lockManager.lock(lockInput));
runUnlockTimerTask("testLock", 3000);
// This will retry infinitely since the other lock is not released!
// After 5 seconds, the parallel thread will unlock the key, and the below TC will pass
- assertSuccessfulFutureRpcResult(lockManager.lock(lockInput));
+ assertVoidRpcSuccess(lockManager.lock(lockInput));
}
@Test
TryLockInput lockInput = new TryLockInputBuilder().setLockName("testTryLock").setTime(3L)
.setTimeUnit(TimeUnits.Seconds).build();
- assertSuccessfulFutureRpcResult(lockManager.tryLock(lockInput));
+ assertVoidRpcSuccess(lockManager.tryLock(lockInput));
// The second acquireLock request will retry for 3 seconds
// and since the first lock is not unlocked, the request will fail.
lockInput = new TryLockInputBuilder().setLockName("testTryLock").setTime(3000L)
.setTimeUnit(TimeUnits.Milliseconds).build();
- assertFailedFutureRpcResult(lockManager.tryLock(lockInput));
+ assertRpcErrorWithoutCausesOrMessages(lockManager.tryLock(lockInput));
// Try to unlock the key in a separate thread before retry expires, and see
// if lock gets acquired.
lockInput = new TryLockInputBuilder().setLockName("testTryLock").setTime(4000000L)
.setTimeUnit(TimeUnits.Microseconds).build();
- assertSuccessfulFutureRpcResult(lockManager.tryLock(lockInput));
+ assertVoidRpcSuccess(lockManager.tryLock(lockInput));
}
@Test
- public void testOptimisticLockFailedException() throws InterruptedException, ExecutionException, TimeoutException {
+ public void test3sOptimisticLockFailedExceptionOnLock()
+ throws InterruptedException, ExecutionException, TimeoutException {
dbFailureSimulator.failSubmits(new OptimisticLockFailedException("bada boum bam!"));
LockInput lockInput = new LockInputBuilder().setLockName("testLock").build();
runUnfailSubmitsTimerTask(3000); // see other tests above
- assertSuccessfulFutureRpcResult(lockManager.lock(lockInput));
+ assertVoidRpcSuccess(lockManager.lock(lockInput));
}
@Test
public void testAskTimeOutException() throws InterruptedException, ExecutionException, TimeoutException {
String lockName = "testLock";
- logCaptureRule.expectError("Unable to acquire lock for " + lockName + ", try 1");
+ logCaptureRule.expectError("Unable to acquire lock for " + lockName + ", try 1", 1);
dbFailureSimulator.failButSubmitsAnyways();
LockInput lockInput = new LockInputBuilder().setLockName(lockName).build();
- assertSuccessfulFutureRpcResult(lockManager.lock(lockInput));
+ assertRpcErrorCause(lockManager.lock(lockInput), TransactionCommitFailedException.class,
+ "caused by simulated AskTimeoutException");
}
- private void assertSuccessfulFutureRpcResult(
- Future<RpcResult<Void>> futureRpcResult) throws InterruptedException, ExecutionException, TimeoutException {
- assertThat(futureRpcResult.get(5, TimeUnit.SECONDS).isSuccessful()).isTrue();
- assertThat(futureRpcResult.get(5, TimeUnit.SECONDS).getErrors()).isEmpty();
+ @Test
+ public void testEternalTransactionCommitFailedExceptionOnLock()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ logCaptureRule.expectError("RPC lock() failed; input = LockInput [_lockName=testLock, augmentation=[]]");
+ dbFailureSimulator.failSubmits(new TransactionCommitFailedException("bada boum bam!"));
+ LockInput lockInput = new LockInputBuilder().setLockName("testLock").build();
+ assertRpcErrorCause(lockManager.lock(lockInput), TransactionCommitFailedException.class, "bada boum bam!");
}
- private void assertFailedFutureRpcResult(
- Future<RpcResult<Void>> futureRpcResult) throws InterruptedException, ExecutionException, TimeoutException {
- assertThat(futureRpcResult.get(5, TimeUnit.SECONDS).isSuccessful()).isFalse();
- }
+ // TODO testEternalReadFailedExceptionOnLock() throws InterruptedException, ExecutionException, TimeoutException {
+
+ // TODO test3sOptimisticLockFailedExceptionOnUnLock()
+ // TODO testEternalReadFailedExceptionOnUnLock()
+ // TODO testEternalTransactionCommitFailedExceptionOnUnLock()
+
+ // TODO test3sOptimisticLockFailedExceptionOnTryLock()
+ // TODO testEternalReadFailedExceptionOnTryLock()
+ // TODO testEternalTransactionCommitFailedExceptionOnTryLock()
private void runUnlockTimerTask(String lockKey, long delay) {
Timer timer = new Timer();
public void run() {
UnlockInput unlockInput = new UnlockInputBuilder().setLockName(lockKey).build();
try {
- assertSuccessfulFutureRpcResult(lockManager.unlock(unlockInput));
+ assertVoidRpcSuccess(lockManager.unlock(unlockInput));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("runUnlockTimerTask() failed", e);
// throw new RuntimeException(e) is useless here, as this in a BG Thread, and it would go nowhere
return getResult(futureRpcResult.get(1, MINUTES));
}
- private static <T> void assertRpcErrorCause(RpcResult<T> rpcResult, Class<?> expectedExceptionClass,
- String expectedRpcErrorMessage) {
+ public static void assertVoidRpcSuccess(Future<RpcResult<Void>> futureRpcResult)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ RpcResult<Void> rpcResult = futureRpcResult.get(1, MINUTES);
+ assertThat(rpcResult.isSuccessful()).isTrue();
+ assertThat(rpcResult.getErrors()).isEmpty();
+ }
+
+ public static <T> void assertRpcErrorWithoutCausesOrMessages(Future<RpcResult<T>> futureRpcResult)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ RpcResult<T> rpcResult = futureRpcResult.get(1, MINUTES);
assertThat(rpcResult.isSuccessful()).named("rpcResult.isSuccessful").isFalse();
- Collection<RpcError> errors = rpcResult.getErrors();
- assertThat(errors).named("rpcResult.errors").hasSize(1);
- RpcError firstError = errors.iterator().next();
- assertThat(firstError.getErrorType()).named("rpcResult.errors[0].errorType").isEqualTo(ErrorType.APPLICATION);
- assertThat(firstError.getCause()).named("rpcResult.errors[0].cause").isInstanceOf(expectedExceptionClass);
- assertThat(firstError.getMessage()).named("rpcResult.errors[0].message").isEqualTo(expectedRpcErrorMessage);
+ assertThat(rpcResult.getErrors()).named("rpcResult.errors").isEmpty();
}
public static <T> void assertRpcErrorCause(Future<RpcResult<T>> futureRpcResult, Class<?> expectedExceptionClass,
assertRpcErrorCause(futureRpcResult.get(1, MINUTES), expectedExceptionClass, expectedRpcErrorMessage);
}
+ private static <T> void assertRpcErrorCause(RpcResult<T> rpcResult, Class<?> expected1stExceptionClass,
+ String expected1stRpcErrorMessage) {
+ assertThat(rpcResult.isSuccessful()).named("rpcResult.isSuccessful").isFalse();
+ Collection<RpcError> errors = rpcResult.getErrors();
+ assertThat(errors).named("rpcResult.errors").hasSize(1);
+ RpcError error1 = errors.iterator().next();
+ assertThat(error1.getErrorType()).named("rpcResult.errors[0].errorType").isEqualTo(ErrorType.APPLICATION);
+ assertThat(error1.getMessage()).named("rpcResult.errors[0].message").isEqualTo(expected1stRpcErrorMessage);
+ if (error1.getCause() != null) {
+ // Check needed because FutureRpcResults does not propagate cause if OperationFailedException
+ assertThat(error1.getCause()).named("rpcResult.errors[0].cause").isInstanceOf(expected1stExceptionClass);
+ }
+ }
+
}