2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.lockmanager.impl;
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Objects;
15 import java.util.Optional;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.apache.aries.blueprint.annotation.service.Reference;
26 import org.opendaylight.genius.infra.Datastore;
27 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
28 import org.opendaylight.genius.utils.JvmGlobalLocks;
29 import org.opendaylight.infrautils.utils.concurrent.Executors;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
34 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
46 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 public class LockManagerServiceImpl implements LockManagerService {
55 private static final FluentFuture<LockOutput> IMMEDIATE_LOCK = FluentFutures.immediateFluentFuture(
56 new LockOutputBuilder().build());
57 private static final UnlockOutput UNLOCK_OUTPUT = new UnlockOutputBuilder().build();
58 private static final ListenableFuture<RpcResult<TryLockOutput>> FAILED_TRYLOCK =
59 RpcResultBuilder.<TryLockOutput>failed().buildFuture();
60 private static final ListenableFuture<RpcResult<TryLockOutput>> SUCCESSFUL_TRYLOCK =
61 RpcResultBuilder.success(new TryLockOutputBuilder().build()).buildFuture();
63 private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
64 private static final int DEFAULT_RETRY_COUNT = 10;
65 private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
67 private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
68 new ConcurrentHashMap<>();
70 private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
71 //TODO: replace with shared executor service once that is available
72 private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(25,
73 "LockManagerService", LOG);
75 private final RetryingManagedNewTransactionRunner txRunner;
76 private final LockManagerUtils lockManagerUtils;
79 public LockManagerServiceImpl(final @Reference DataBroker dataBroker, final LockManagerUtils lockManagerUtils) {
80 this.lockManagerUtils = lockManagerUtils;
81 this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
85 public ListenableFuture<RpcResult<LockOutput>> lock(LockInput input) {
86 final Lock lockData = lockManagerUtils.buildLock(input.getLockName(), lockManagerUtils.getUniqueID());
87 return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
88 return getLock(lockData);
93 public ListenableFuture<RpcResult<TryLockOutput>> tryLock(TryLockInput input) {
94 String lockName = input.getLockName();
95 String owner = lockManagerUtils.getUniqueID();
96 LOG.debug("Locking {}, owner {}" , lockName, owner);
97 long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT
98 : input.getTime().toJava();
99 TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
100 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
101 waitTime = timeUnit.toMillis(waitTime);
102 long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
103 Lock lockData = lockManagerUtils.buildLock(lockName, owner);
105 final boolean success;
107 success = getLock(lockData, retryCount);
108 } catch (InterruptedException e) {
109 LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
110 return FAILED_TRYLOCK;
114 LOG.debug("Acquired lock {} by owner {}", lockName, owner);
115 return SUCCESSFUL_TRYLOCK;
118 LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
119 return FAILED_TRYLOCK;
123 public ListenableFuture<RpcResult<UnlockOutput>> unlock(UnlockInput input) {
124 String lockName = input.getLockName();
125 LOG.debug("Unlocking {}", lockName);
126 InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
127 return FutureRpcResults.fromListenableFuture(LOG, input,
128 () -> Futures.transform(unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT),
129 unused -> UNLOCK_OUTPUT, MoreExecutors.directExecutor())).build();
132 private ListenableFuture<Void> unlock(final String lockName,
133 final InstanceIdentifier<Lock> lockInstanceIdentifier, final int retry) {
134 ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
135 Boolean result = tx.exists(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
137 LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
139 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
142 return Futures.catchingAsync(future, Exception.class, exception -> {
143 LOG.error("in unlock unable to unlock {} due to {}, try {} of {}", lockName,
144 exception.getMessage(), DEFAULT_RETRY_COUNT - retry + 1, DEFAULT_RETRY_COUNT);
146 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
147 return unlock(lockName, lockInstanceIdentifier, retry - 1);
151 }, EXECUTOR_SERVICE);
154 void removeLock(final Lock removedLock) {
155 final String lockName = removedLock.getLockName();
156 LOG.debug("Received remove for lock {} : {}", lockName, removedLock);
157 CompletableFuture<Void> lock = lockSynchronizerMap.get(lockName);
159 // FindBugs flags a false violation here - "passes a null value as the parameter of a method which must be
160 // non-null. Either this parameter has been explicitly marked as @NonNull, or analysis has determined that
161 // this parameter is always dereferenced.". However neither is true. The type param is Void so you have to
168 * Try to acquire lock indefinitely until it is successful.
170 private ListenableFuture<LockOutput> getLock(final Lock lockData)
171 throws InterruptedException {
172 // Count from 1 to provide human-comprehensible messages
173 String lockName = lockData.getLockName();
174 for (int retry = 1;; retry++) {
176 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
177 if (readWriteLock(lockData)) {
178 return IMMEDIATE_LOCK;
181 if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
182 LOG.debug("Already locked for {} after waiting {}ms, try {}",
183 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
185 LOG.warn("Already locked for {} after waiting {}ms, try {}",
186 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
188 } catch (ExecutionException e) {
189 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
190 if (!(e.getCause() instanceof OptimisticLockFailedException
191 || e.getCause() instanceof DataStoreUnavailableException)) {
192 return Futures.immediateFailedFuture(e.getCause());
195 CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
196 if (future != null) {
198 // Making this as timed get to avoid any missing signal for lock remove notifications
199 // in LockListener (which does the future.complete())
200 future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
201 } catch (InterruptedException | ExecutionException e) {
202 LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
203 } catch (TimeoutException e) {
204 LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
206 lockSynchronizerMap.remove(lockName);
212 * Try to acquire lock for mentioned retryCount. Returns true if
213 * successfully acquired lock.
215 private boolean getLock(Lock lockData, long retryCount) throws InterruptedException {
216 // Count from 1 to provide human-comprehensible messages
217 String lockName = lockData.getLockName();
218 for (int retry = 1; retry <= retryCount; retry++) {
220 if (readWriteLock(lockData)) {
223 } catch (ExecutionException e) {
224 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
227 LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName, DEFAULT_WAIT_TIME_IN_MILLIS,
229 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
234 private static void logUnlessCauseIsOptimisticLockFailedException(String name, int retry,
235 ExecutionException exception) {
236 // Log anything else than OptimisticLockFailedException with level error.
237 // Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
238 // and this class is explicitly designed to retry obtaining a lock in case of an
239 // OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
240 // TODO This class may be completely reviewed in the future to work entirely differently;
241 // e.g. using an EntityOwnershipService, as proposed in Bug 8224.
242 if (!(exception.getCause() instanceof OptimisticLockFailedException)) {
243 LOG.error("Unable to acquire lock for {}, try {}", name, retry, exception);
248 * Read and write the lock immediately if available. Returns true if
249 * successfully locked.
251 private boolean readWriteLock(final Lock lockData)
252 throws InterruptedException, ExecutionException {
253 // FIXME: Since netvirt is currently also locking on strings, we need to ensure those places do not synchronize
254 // with us before switching to .getLockFor()
255 final ReentrantLock lock = JvmGlobalLocks.getLockForString(lockData.getLockName());
258 return txRunner.applyWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
259 final InstanceIdentifier<Lock> lockInstanceIdentifier =
260 LockManagerUtils.getLockInstanceIdentifier(lockData.key());
261 Optional<Lock> result = tx.read(lockInstanceIdentifier).get();
262 if (!result.isPresent()) {
263 LOG.debug("Writing lock lockData {}", lockData);
264 tx.put(lockInstanceIdentifier, lockData);
268 String lockDataOwner = result.get().getLockOwner();
269 String currentOwner = lockData.getLockOwner();
270 return Objects.equals(currentOwner, lockDataOwner);