Bug 9034: LockManager cancel() ReadWriteTransaction if it's not submit()
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / LockManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.lockmanager;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
13
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 @Singleton
40 public class LockManager implements LockManagerService {
41
42     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
43             new ConcurrentHashMap<>();
44
45     private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
46
47     private static final int DEFAULT_RETRY_COUNT = 3;
48     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
49
50     private final DataBroker broker;
51
52     @Inject
53     public LockManager(final DataBroker dataBroker) {
54         this.broker = dataBroker;
55     }
56
57     @PostConstruct
58     public void start() {
59         LOG.info("{} start", getClass().getSimpleName());
60     }
61
62     @PreDestroy
63     public void close() {
64         LOG.info("{} close", getClass().getSimpleName());
65     }
66
67     @Override
68     public Future<RpcResult<Void>> lock(LockInput input) {
69         String lockName = input.getLockName();
70         LOG.debug("Locking {}", lockName);
71         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
72         Lock lockData = LockManagerUtils.buildLockData(lockName);
73         try {
74             getLock(lockInstanceIdentifier, lockData);
75             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
76             LOG.debug("Acquired lock {}", lockName);
77             return Futures.immediateFuture(lockRpcBuilder.build());
78         } catch (InterruptedException e) {
79             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
80             LOG.error("Failed to get lock {}", lockName, e);
81             return Futures.immediateFuture(lockRpcBuilder.build());
82         }
83     }
84
85     @Override
86     public Future<RpcResult<Void>> tryLock(TryLockInput input) {
87         String lockName = input.getLockName();
88         LOG.debug("Locking {}", lockName);
89         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
90         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
91                 : LockManagerUtils.convertToTimeUnit(input.getTimeUnit());
92         waitTime = timeUnit.toMillis(waitTime);
93         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
94         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
95         Lock lockData = LockManagerUtils.buildLockData(lockName);
96
97         RpcResultBuilder<Void> lockRpcBuilder;
98         try {
99             if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
100                 lockRpcBuilder = RpcResultBuilder.success();
101                 LOG.debug("Acquired lock {}", lockName);
102             } else {
103                 lockRpcBuilder = RpcResultBuilder.failed();
104                 LOG.error("Failed to get lock {}", lockName);
105             }
106         } catch (InterruptedException e) {
107             lockRpcBuilder = RpcResultBuilder.failed();
108             LOG.error("Failed to get lock {}", lockName, e);
109         }
110         return Futures.immediateFuture(lockRpcBuilder.build());
111     }
112
113     @Override
114     public Future<RpcResult<Void>> unlock(UnlockInput input) {
115         String lockName = input.getLockName();
116         LOG.debug("Unlocking {}", lockName);
117         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
118         unlock(lockName, lockInstanceIdentifier);
119         RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
120         return Futures.immediateFuture(lockRpcBuilder.build());
121     }
122
123     private void unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier) {
124         ReadWriteTransaction tx = broker.newReadWriteTransaction();
125         try {
126             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
127             if (!result.isPresent()) {
128                 LOG.debug("{} is already unlocked", lockName);
129                 tx.cancel();
130             } else {
131                 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
132                 tx.submit().get();
133             }
134         } catch (InterruptedException | ExecutionException e) {
135             LOG.error("In unlock unable to unlock: {}. Reason :", lockName, e);
136         }
137     }
138
139     public CompletableFuture<Void> getSynchronizerForLock(String lockName) {
140         return lockSynchronizerMap.get(lockName);
141     }
142
143     /**
144      * Try to acquire lock indefinitely until it is successful.
145      */
146     private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
147             throws InterruptedException {
148         // Count from 1 to provide human-comprehensible messages
149         String lockName = lockData.getLockName();
150         for (int retry = 1;; retry++) {
151             try {
152                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
153                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
154                     return;
155                 } else {
156                     if (retry >= 30) {
157                         LOG.debug("Already locked for {} after waiting {}ms, try {}",
158                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
159                     } else {
160                         LOG.warn("Already locked for {} after waiting {}ms, try {}",
161                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
162                     }
163                 }
164             } catch (ExecutionException e) {
165                 LOG.error("Unable to acquire lock for {}, try {}", lockName, retry);
166             }
167             CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
168             if (future != null) {
169                 try {
170                     // Making this as timed get to avoid any missing signal for lock remove notifications
171                     // in LockListener (which does the futue.complete())
172                     future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
173                 } catch (InterruptedException | ExecutionException e) {
174                     LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
175                 } catch (TimeoutException e) {
176                     LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
177                 }
178                 lockSynchronizerMap.remove(lockName);
179             }
180         }
181     }
182
183     /**
184      * Try to acquire lock for mentioned retryCount. Returns true if
185      * successfully acquired lock.
186      */
187     private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
188             throws InterruptedException {
189         // Count from 1 to provide human-comprehensible messages
190         String lockName = lockData.getLockName();
191         for (int retry = 1; retry <= retryCount; retry++) {
192             try {
193                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
194                     return true;
195                 } else {
196                     LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
197                             DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
198                 }
199             } catch (ExecutionException e) {
200                 LOG.error("Unable to acquire lock for {}, try {} of {}", lockName, retry,
201                         retryCount);
202             }
203             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
204         }
205         return false;
206     }
207
208     /**
209      * Read and write the lock immediately if available. Returns true if
210      * successfully locked.
211      */
212     private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
213             throws InterruptedException, ExecutionException {
214         String lockName = lockData.getLockName();
215         synchronized (lockName.intern()) {
216             ReadWriteTransaction tx = broker.newReadWriteTransaction();
217             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
218             if (!result.isPresent()) {
219                 LOG.debug("Writing lock lockData {}", lockData);
220                 tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);
221                 tx.submit().get();
222                 return true;
223             } else {
224                 tx.cancel();
225                 return false;
226             }
227         }
228     }
229 }