2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.lockmanager;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 public class LockManager implements LockManagerService {
42 private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
43 new ConcurrentHashMap<>();
45 private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
47 private static final int DEFAULT_RETRY_COUNT = 3;
48 private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
50 private final DataBroker broker;
53 public LockManager(final DataBroker dataBroker) {
54 this.broker = dataBroker;
59 LOG.info("{} start", getClass().getSimpleName());
64 LOG.info("{} close", getClass().getSimpleName());
68 public Future<RpcResult<Void>> lock(LockInput input) {
69 String lockName = input.getLockName();
70 LOG.debug("Locking {}", lockName);
71 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
72 Lock lockData = LockManagerUtils.buildLockData(lockName);
74 getLock(lockInstanceIdentifier, lockData);
75 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
76 LOG.debug("Acquired lock {}", lockName);
77 return Futures.immediateFuture(lockRpcBuilder.build());
78 } catch (InterruptedException e) {
79 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
80 LOG.error("Failed to get lock {}", lockName, e);
81 return Futures.immediateFuture(lockRpcBuilder.build());
86 public Future<RpcResult<Void>> tryLock(TryLockInput input) {
87 String lockName = input.getLockName();
88 LOG.debug("Locking {}", lockName);
89 long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
90 TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
91 : LockManagerUtils.convertToTimeUnit(input.getTimeUnit());
92 waitTime = timeUnit.toMillis(waitTime);
93 long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
94 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
95 Lock lockData = LockManagerUtils.buildLockData(lockName);
97 RpcResultBuilder<Void> lockRpcBuilder;
99 if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
100 lockRpcBuilder = RpcResultBuilder.success();
101 LOG.debug("Acquired lock {}", lockName);
103 lockRpcBuilder = RpcResultBuilder.failed();
104 LOG.error("Failed to get lock {}", lockName);
106 } catch (InterruptedException e) {
107 lockRpcBuilder = RpcResultBuilder.failed();
108 LOG.error("Failed to get lock {}", lockName, e);
110 return Futures.immediateFuture(lockRpcBuilder.build());
114 public Future<RpcResult<Void>> unlock(UnlockInput input) {
115 String lockName = input.getLockName();
116 LOG.debug("Unlocking {}", lockName);
117 InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
118 unlock(lockName, lockInstanceIdentifier);
119 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
120 return Futures.immediateFuture(lockRpcBuilder.build());
123 private void unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier) {
124 ReadWriteTransaction tx = broker.newReadWriteTransaction();
126 Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
127 if (!result.isPresent()) {
128 LOG.debug("{} is already unlocked", lockName);
131 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
134 } catch (InterruptedException | ExecutionException e) {
135 LOG.error("In unlock unable to unlock: {}. Reason :", lockName, e);
139 public CompletableFuture<Void> getSynchronizerForLock(String lockName) {
140 return lockSynchronizerMap.get(lockName);
144 * Try to acquire lock indefinitely until it is successful.
146 private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
147 throws InterruptedException {
148 // Count from 1 to provide human-comprehensible messages
149 String lockName = lockData.getLockName();
150 for (int retry = 1;; retry++) {
152 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
153 if (readWriteLock(lockInstanceIdentifier, lockData)) {
157 LOG.debug("Already locked for {} after waiting {}ms, try {}",
158 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
160 LOG.warn("Already locked for {} after waiting {}ms, try {}",
161 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
164 } catch (ExecutionException e) {
165 LOG.error("Unable to acquire lock for {}, try {}", lockName, retry);
167 CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
168 if (future != null) {
170 // Making this as timed get to avoid any missing signal for lock remove notifications
171 // in LockListener (which does the futue.complete())
172 future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
173 } catch (InterruptedException | ExecutionException e) {
174 LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
175 } catch (TimeoutException e) {
176 LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
178 lockSynchronizerMap.remove(lockName);
184 * Try to acquire lock for mentioned retryCount. Returns true if
185 * successfully acquired lock.
187 private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
188 throws InterruptedException {
189 // Count from 1 to provide human-comprehensible messages
190 String lockName = lockData.getLockName();
191 for (int retry = 1; retry <= retryCount; retry++) {
193 if (readWriteLock(lockInstanceIdentifier, lockData)) {
196 LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
197 DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
199 } catch (ExecutionException e) {
200 LOG.error("Unable to acquire lock for {}, try {} of {}", lockName, retry,
203 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
209 * Read and write the lock immediately if available. Returns true if
210 * successfully locked.
212 private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
213 throws InterruptedException, ExecutionException {
214 String lockName = lockData.getLockName();
215 synchronized (lockName.intern()) {
216 ReadWriteTransaction tx = broker.newReadWriteTransaction();
217 Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
218 if (!result.isPresent()) {
219 LOG.debug("Writing lock lockData {}", lockData);
220 tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);