2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.lockmanager.impl;
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Objects;
15 import java.util.Optional;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.apache.aries.blueprint.annotation.service.Reference;
26 import org.opendaylight.genius.utils.JvmGlobalLocks;
27 import org.opendaylight.infrautils.utils.concurrent.Executors;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.util.Datastore;
30 import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
31 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
32 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
33 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
45 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 public class LockManagerServiceImpl implements LockManagerService {
54 private static final FluentFuture<LockOutput> IMMEDIATE_LOCK = FluentFutures.immediateFluentFuture(
55 new LockOutputBuilder().build());
56 private static final UnlockOutput UNLOCK_OUTPUT = new UnlockOutputBuilder().build();
57 private static final ListenableFuture<RpcResult<TryLockOutput>> FAILED_TRYLOCK =
58 RpcResultBuilder.<TryLockOutput>failed().buildFuture();
59 private static final ListenableFuture<RpcResult<TryLockOutput>> SUCCESSFUL_TRYLOCK =
60 RpcResultBuilder.success(new TryLockOutputBuilder().build()).buildFuture();
62 private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
63 private static final int DEFAULT_RETRY_COUNT = 10;
64 private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
66 private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
67 new ConcurrentHashMap<>();
69 private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
70 //TODO: replace with shared executor service once that is available
71 private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(25,
72 "LockManagerService", LOG);
74 private final RetryingManagedNewTransactionRunner txRunner;
75 private final LockManagerUtils lockManagerUtils;
78 public LockManagerServiceImpl(final @Reference DataBroker dataBroker, final LockManagerUtils lockManagerUtils) {
79 this.lockManagerUtils = lockManagerUtils;
80 this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
84 public ListenableFuture<RpcResult<LockOutput>> lock(LockInput input) {
85 final Lock lockData = lockManagerUtils.buildLock(input.getLockName(), lockManagerUtils.getUniqueID());
86 return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
87 return getLock(lockData);
92 public ListenableFuture<RpcResult<TryLockOutput>> tryLock(TryLockInput input) {
93 String lockName = input.getLockName();
94 String owner = lockManagerUtils.getUniqueID();
95 LOG.debug("Locking {}, owner {}" , lockName, owner);
96 long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT
97 : input.getTime().toJava();
98 TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
99 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
100 waitTime = timeUnit.toMillis(waitTime);
101 long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
102 Lock lockData = lockManagerUtils.buildLock(lockName, owner);
104 final boolean success;
106 success = getLock(lockData, retryCount);
107 } catch (InterruptedException e) {
108 LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
109 return FAILED_TRYLOCK;
113 LOG.debug("Acquired lock {} by owner {}", lockName, owner);
114 return SUCCESSFUL_TRYLOCK;
117 LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
118 return FAILED_TRYLOCK;
122 public ListenableFuture<RpcResult<UnlockOutput>> unlock(UnlockInput input) {
123 String lockName = input.getLockName();
124 LOG.debug("Unlocking {}", lockName);
125 InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
126 return FutureRpcResults.fromListenableFuture(LOG, input,
127 () -> Futures.transform(unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT),
128 unused -> UNLOCK_OUTPUT, MoreExecutors.directExecutor())).build();
131 private ListenableFuture<?> unlock(final String lockName,
132 final InstanceIdentifier<Lock> lockInstanceIdentifier, final int retry) {
133 ListenableFuture<?> future = txRunner.callWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
134 Boolean result = tx.exists(lockInstanceIdentifier).get();
136 LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
138 tx.delete(lockInstanceIdentifier);
141 return Futures.catchingAsync(future, Exception.class, exception -> {
142 LOG.error("in unlock unable to unlock {} due to {}, try {} of {}", lockName,
143 exception.getMessage(), DEFAULT_RETRY_COUNT - retry + 1, DEFAULT_RETRY_COUNT);
145 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
146 return (ListenableFuture) unlock(lockName, lockInstanceIdentifier, retry - 1);
150 }, EXECUTOR_SERVICE);
153 void removeLock(final Lock removedLock) {
154 final String lockName = removedLock.getLockName();
155 LOG.debug("Received remove for lock {} : {}", lockName, removedLock);
156 CompletableFuture<Void> lock = lockSynchronizerMap.get(lockName);
158 // FindBugs flags a false violation here - "passes a null value as the parameter of a method which must be
159 // non-null. Either this parameter has been explicitly marked as @NonNull, or analysis has determined that
160 // this parameter is always dereferenced.". However neither is true. The type param is Void so you have to
167 * Try to acquire lock indefinitely until it is successful.
169 private ListenableFuture<LockOutput> getLock(final Lock lockData)
170 throws InterruptedException {
171 // Count from 1 to provide human-comprehensible messages
172 String lockName = lockData.getLockName();
173 for (int retry = 1;; retry++) {
175 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
176 if (readWriteLock(lockData)) {
177 return IMMEDIATE_LOCK;
180 if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
181 LOG.debug("Already locked for {} after waiting {}ms, try {}",
182 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
184 LOG.warn("Already locked for {} after waiting {}ms, try {}",
185 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
187 } catch (ExecutionException e) {
188 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
189 if (!(e.getCause() instanceof OptimisticLockFailedException
190 || e.getCause() instanceof DataStoreUnavailableException)) {
191 return Futures.immediateFailedFuture(e.getCause());
194 CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
195 if (future != null) {
197 // Making this as timed get to avoid any missing signal for lock remove notifications
198 // in LockListener (which does the future.complete())
199 future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
200 } catch (InterruptedException | ExecutionException e) {
201 LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
202 } catch (TimeoutException e) {
203 LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
205 lockSynchronizerMap.remove(lockName);
211 * Try to acquire lock for mentioned retryCount. Returns true if
212 * successfully acquired lock.
214 private boolean getLock(Lock lockData, long retryCount) throws InterruptedException {
215 // Count from 1 to provide human-comprehensible messages
216 String lockName = lockData.getLockName();
217 for (int retry = 1; retry <= retryCount; retry++) {
219 if (readWriteLock(lockData)) {
222 } catch (ExecutionException e) {
223 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
226 LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName, DEFAULT_WAIT_TIME_IN_MILLIS,
228 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
233 private static void logUnlessCauseIsOptimisticLockFailedException(String name, int retry,
234 ExecutionException exception) {
235 // Log anything else than OptimisticLockFailedException with level error.
236 // Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
237 // and this class is explicitly designed to retry obtaining a lock in case of an
238 // OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
239 // TODO This class may be completely reviewed in the future to work entirely differently;
240 // e.g. using an EntityOwnershipService, as proposed in Bug 8224.
241 if (!(exception.getCause() instanceof OptimisticLockFailedException)) {
242 LOG.error("Unable to acquire lock for {}, try {}", name, retry, exception);
247 * Read and write the lock immediately if available. Returns true if
248 * successfully locked.
250 private boolean readWriteLock(final Lock lockData)
251 throws InterruptedException, ExecutionException {
252 // FIXME: Since netvirt is currently also locking on strings, we need to ensure those places do not synchronize
253 // with us before switching to .getLockFor()
254 final ReentrantLock lock = JvmGlobalLocks.getLockForString(lockData.getLockName());
257 return txRunner.applyWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
258 final InstanceIdentifier<Lock> lockInstanceIdentifier =
259 LockManagerUtils.getLockInstanceIdentifier(lockData.key());
260 Optional<Lock> result = tx.read(lockInstanceIdentifier).get();
261 if (!result.isPresent()) {
262 LOG.debug("Writing lock lockData {}", lockData);
263 tx.put(lockInstanceIdentifier, lockData);
267 String lockDataOwner = result.get().getLockOwner();
268 String currentOwner = lockData.getLockOwner();
269 return Objects.equals(currentOwner, lockDataOwner);