2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.lockmanager;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 public class LockManager implements LockManagerService {
40 private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
41 private static final int DEFAULT_RETRY_COUNT = 3;
42 private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
44 private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
45 new ConcurrentHashMap<>();
47 private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
49 private final DataBroker broker;
52 public LockManager(final DataBroker dataBroker) {
53 this.broker = dataBroker;
57 public Future<RpcResult<Void>> lock(LockInput input) {
58 String lockName = input.getLockName();
59 LOG.debug("Locking {}", lockName);
60 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
61 Lock lockData = LockManagerUtils.buildLockData(lockName);
63 getLock(lockInstanceIdentifier, lockData);
64 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
65 LOG.debug("Acquired lock {}", lockName);
66 return Futures.immediateFuture(lockRpcBuilder.build());
67 } catch (InterruptedException e) {
68 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
69 LOG.error("Failed to get lock {}", lockName, e);
70 return Futures.immediateFuture(lockRpcBuilder.build());
75 public Future<RpcResult<Void>> tryLock(TryLockInput input) {
76 String lockName = input.getLockName();
77 LOG.debug("Locking {}", lockName);
78 long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
79 TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
80 : LockManagerUtils.convertToTimeUnit(input.getTimeUnit());
81 waitTime = timeUnit.toMillis(waitTime);
82 long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
83 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
84 Lock lockData = LockManagerUtils.buildLockData(lockName);
86 RpcResultBuilder<Void> lockRpcBuilder;
88 if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
89 lockRpcBuilder = RpcResultBuilder.success();
90 LOG.debug("Acquired lock {}", lockName);
92 lockRpcBuilder = RpcResultBuilder.failed();
93 LOG.error("Failed to get lock {}", lockName);
95 } catch (InterruptedException e) {
96 lockRpcBuilder = RpcResultBuilder.failed();
97 LOG.error("Failed to get lock {}", lockName, e);
99 return Futures.immediateFuture(lockRpcBuilder.build());
103 public Future<RpcResult<Void>> unlock(UnlockInput input) {
104 String lockName = input.getLockName();
105 LOG.debug("Unlocking {}", lockName);
106 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
107 unlock(lockName, lockInstanceIdentifier);
108 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
109 return Futures.immediateFuture(lockRpcBuilder.build());
112 private void unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier) {
113 ReadWriteTransaction tx = broker.newReadWriteTransaction();
115 Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
116 if (!result.isPresent()) {
117 LOG.debug("{} is already unlocked", lockName);
120 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
123 } catch (InterruptedException | ExecutionException e) {
124 LOG.error("In unlock unable to unlock: {}. Reason :", lockName, e);
128 public CompletableFuture<Void> getSynchronizerForLock(String lockName) {
129 return lockSynchronizerMap.get(lockName);
133 * Try to acquire lock indefinitely until it is successful.
135 private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
136 throws InterruptedException {
137 // Count from 1 to provide human-comprehensible messages
138 String lockName = lockData.getLockName();
139 for (int retry = 1;; retry++) {
141 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
142 if (readWriteLock(lockInstanceIdentifier, lockData)) {
145 if (retry >= DEFAULT_NUMBER_LOCKING_ATTEMPS) {
146 LOG.debug("Already locked for {} after waiting {}ms, try {}",
147 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
149 LOG.warn("Already locked for {} after waiting {}ms, try {}",
150 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
153 } catch (ExecutionException e) {
154 LOG.error("Unable to acquire lock for {}, try {}", lockName, retry);
156 CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
157 if (future != null) {
159 // Making this as timed get to avoid any missing signal for lock remove notifications
160 // in LockListener (which does the futue.complete())
161 future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
162 } catch (InterruptedException | ExecutionException e) {
163 LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
164 } catch (TimeoutException e) {
165 LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
167 lockSynchronizerMap.remove(lockName);
173 * Try to acquire lock for mentioned retryCount. Returns true if
174 * successfully acquired lock.
176 private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
177 throws InterruptedException {
178 // Count from 1 to provide human-comprehensible messages
179 String lockName = lockData.getLockName();
180 for (int retry = 1; retry <= retryCount; retry++) {
182 if (readWriteLock(lockInstanceIdentifier, lockData)) {
185 LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
186 DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
188 } catch (ExecutionException e) {
189 LOG.error("Unable to acquire lock for {}, try {} of {}", lockName, retry,
192 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
198 * Read and write the lock immediately if available. Returns true if
199 * successfully locked.
201 private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
202 throws InterruptedException, ExecutionException {
203 String lockName = lockData.getLockName();
204 synchronized (lockName.intern()) {
205 ReadWriteTransaction tx = broker.newReadWriteTransaction();
206 Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
207 if (!result.isPresent()) {
208 LOG.debug("Writing lock lockData {}", lockData);
209 tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);