Bump version odlparent->6.0.0,mdsal->5.0.3
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / impl / LockManagerServiceImpl.java
index deb84bb04f0f80a7116a82b2904f7ad1859a1a7f..e210b423024efc99f5b20a980ecbe2246501b29f 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.genius.lockmanager.impl;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -15,6 +16,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
@@ -22,21 +24,26 @@ import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.aries.blueprint.annotation.service.Reference;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.genius.infra.Datastore;
 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
 import org.opendaylight.genius.utils.JvmGlobalLocks;
-import org.opendaylight.serviceutils.tools.mdsal.rpc.FutureRpcResults;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -45,15 +52,25 @@ import org.slf4j.LoggerFactory;
 
 @Singleton
 public class LockManagerServiceImpl implements LockManagerService {
+    private static final FluentFuture<LockOutput> IMMEDIATE_LOCK = FluentFutures.immediateFluentFuture(
+        new LockOutputBuilder().build());
+    private static final UnlockOutput UNLOCK_OUTPUT = new UnlockOutputBuilder().build();
+    private static final ListenableFuture<RpcResult<TryLockOutput>> FAILED_TRYLOCK =
+            RpcResultBuilder.<TryLockOutput>failed().buildFuture();
+    private static final ListenableFuture<RpcResult<TryLockOutput>> SUCCESSFUL_TRYLOCK =
+            RpcResultBuilder.success(new TryLockOutputBuilder().build()).buildFuture();
 
     private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
-    private static final int DEFAULT_RETRY_COUNT = 3;
+    private static final int DEFAULT_RETRY_COUNT = 10;
     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
 
     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
             new ConcurrentHashMap<>();
 
     private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
+    //TODO: replace with shared executor service once that is available
+    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(25,
+            "LockManagerService", LOG);
 
     private final RetryingManagedNewTransactionRunner txRunner;
     private final LockManagerUtils lockManagerUtils;
@@ -66,12 +83,9 @@ public class LockManagerServiceImpl implements LockManagerService {
 
     @Override
     public ListenableFuture<RpcResult<LockOutput>> lock(LockInput input) {
-        String lockName = input.getLockName();
-        String owner = lockManagerUtils.getUniqueID();
+        final Lock lockData = lockManagerUtils.buildLock(input.getLockName(), lockManagerUtils.getUniqueID());
         return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
-            InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
-            Lock lockData = lockManagerUtils.buildLock(lockName, owner);
-            return getLock(lockInstanceIdentifier, lockData);
+            return getLock(lockData);
         }).build();
     }
 
@@ -80,28 +94,29 @@ public class LockManagerServiceImpl implements LockManagerService {
         String lockName = input.getLockName();
         String owner = lockManagerUtils.getUniqueID();
         LOG.debug("Locking {}, owner {}" , lockName, owner);
-        long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
+        long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT
+                                                : input.getTime().toJava();
         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
                 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
         waitTime = timeUnit.toMillis(waitTime);
         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
-        InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
         Lock lockData = lockManagerUtils.buildLock(lockName, owner);
 
-        RpcResultBuilder<TryLockOutput> lockRpcBuilder;
+        final boolean success;
         try {
-            if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
-                lockRpcBuilder = RpcResultBuilder.success();
-                LOG.debug("Acquired lock {} by owner {}", lockName, owner);
-            } else {
-                lockRpcBuilder = RpcResultBuilder.failed();
-                LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
-            }
+            success = getLock(lockData, retryCount);
         } catch (InterruptedException e) {
-            lockRpcBuilder = RpcResultBuilder.failed();
             LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
+            return FAILED_TRYLOCK;
         }
-        return Futures.immediateFuture(lockRpcBuilder.build());
+
+        if (success) {
+            LOG.debug("Acquired lock {} by owner {}", lockName, owner);
+            return SUCCESSFUL_TRYLOCK;
+        }
+
+        LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
+        return FAILED_TRYLOCK;
     }
 
     @Override
@@ -110,14 +125,30 @@ public class LockManagerServiceImpl implements LockManagerService {
         LOG.debug("Unlocking {}", lockName);
         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
         return FutureRpcResults.fromListenableFuture(LOG, input,
-            () -> Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
-                Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
-                if (!result.isPresent()) {
-                    LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
-                } else {
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
-                }
-            }), unused -> new UnlockOutputBuilder().build(), MoreExecutors.directExecutor())).build();
+            () -> Futures.transform(unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT),
+                unused -> UNLOCK_OUTPUT, MoreExecutors.directExecutor())).build();
+    }
+
+    private ListenableFuture<Void> unlock(final String lockName,
+        final InstanceIdentifier<Lock> lockInstanceIdentifier, final int retry) {
+        ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
+            if (!result.isPresent()) {
+                LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
+            } else {
+                tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
+            }
+        });
+        return Futures.catchingAsync(future, Exception.class, exception -> {
+            LOG.error("in unlock unable to unlock {} due to {}, try {} of {}", lockName,
+                exception.getMessage(), DEFAULT_RETRY_COUNT - retry + 1, DEFAULT_RETRY_COUNT);
+            if (retry - 1 > 0) {
+                Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
+                return unlock(lockName, lockInstanceIdentifier, retry - 1);
+            } else {
+                throw exception;
+            }
+        }, EXECUTOR_SERVICE);
     }
 
     void removeLock(final Lock removedLock) {
@@ -126,7 +157,7 @@ public class LockManagerServiceImpl implements LockManagerService {
         CompletableFuture<Void> lock = lockSynchronizerMap.get(lockName);
         if (lock != null) {
             // FindBugs flags a false violation here - "passes a null value as the parameter of a method which must be
-            // non-null. Either this parameter has been explicitly marked as @Nonnull, or analysis has determined that
+            // non-null. Either this parameter has been explicitly marked as @NonNull, or analysis has determined that
             // this parameter is always dereferenced.". However neither is true. The type param is Void so you have to
             // pas null.
             lock.complete(null);
@@ -136,28 +167,28 @@ public class LockManagerServiceImpl implements LockManagerService {
     /**
      * Try to acquire lock indefinitely until it is successful.
      */
-    private ListenableFuture<LockOutput> getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier,
-                                                 final Lock lockData)
+    private ListenableFuture<LockOutput> getLock(final Lock lockData)
             throws InterruptedException {
         // Count from 1 to provide human-comprehensible messages
         String lockName = lockData.getLockName();
         for (int retry = 1;; retry++) {
             try {
                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
-                if (readWriteLock(lockInstanceIdentifier, lockData)) {
-                    return Futures.immediateFuture(null);
+                if (readWriteLock(lockData)) {
+                    return IMMEDIATE_LOCK;
+                }
+
+                if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
+                    LOG.debug("Already locked for {} after waiting {}ms, try {}",
+                        lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
                 } else {
-                    if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
-                        LOG.debug("Already locked for {} after waiting {}ms, try {}",
-                                lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
-                    } else {
-                        LOG.warn("Already locked for {} after waiting {}ms, try {}",
-                                lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
-                    }
+                    LOG.warn("Already locked for {} after waiting {}ms, try {}",
+                        lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
                 }
             } catch (ExecutionException e) {
                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
-                if (!(e.getCause() instanceof OptimisticLockFailedException)) {
+                if (!(e.getCause() instanceof OptimisticLockFailedException
+                    || e.getCause() instanceof DataStoreUnavailableException)) {
                     return Futures.immediateFailedFuture(e.getCause());
                 }
             }
@@ -181,21 +212,20 @@ public class LockManagerServiceImpl implements LockManagerService {
      * Try to acquire lock for mentioned retryCount. Returns true if
      * successfully acquired lock.
      */
-    private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
-            throws InterruptedException {
+    private boolean getLock(Lock lockData, long retryCount) throws InterruptedException {
         // Count from 1 to provide human-comprehensible messages
         String lockName = lockData.getLockName();
         for (int retry = 1; retry <= retryCount; retry++) {
             try {
-                if (readWriteLock(lockInstanceIdentifier, lockData)) {
+                if (readWriteLock(lockData)) {
                     return true;
-                } else {
-                    LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
-                            DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
                 }
             } catch (ExecutionException e) {
                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
             }
+
+            LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName, DEFAULT_WAIT_TIME_IN_MILLIS,
+                retry, retryCount);
             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
         }
         return false;
@@ -218,7 +248,7 @@ public class LockManagerServiceImpl implements LockManagerService {
      * Read and write the lock immediately if available. Returns true if
      * successfully locked.
      */
-    private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
+    private boolean readWriteLock(final Lock lockData)
             throws InterruptedException, ExecutionException {
         // FIXME: Since netvirt is currently also locking on strings, we need to ensure those places do not synchronize
         //        with us before switching to .getLockFor()
@@ -226,10 +256,12 @@ public class LockManagerServiceImpl implements LockManagerService {
         lock.lock();
         try {
             return txRunner.applyWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
+                final InstanceIdentifier<Lock> lockInstanceIdentifier =
+                        LockManagerUtils.getLockInstanceIdentifier(lockData.key());
                 Optional<Lock> result = tx.read(lockInstanceIdentifier).get();
                 if (!result.isPresent()) {
                     LOG.debug("Writing lock lockData {}", lockData);
-                    tx.put(lockInstanceIdentifier, lockData, true);
+                    tx.put(lockInstanceIdentifier, lockData);
                     return true;
                 }