Remove start() method
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / LockManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.lockmanager;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
13
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 @Singleton
38 public class LockManager implements LockManagerService {
39
40     private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
41     private static final int DEFAULT_RETRY_COUNT = 3;
42     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
43
44     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
45             new ConcurrentHashMap<>();
46
47     private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
48
49     private final DataBroker broker;
50
51     @Inject
52     public LockManager(final DataBroker dataBroker) {
53         this.broker = dataBroker;
54     }
55
56     @Override
57     public Future<RpcResult<Void>> lock(LockInput input) {
58         String lockName = input.getLockName();
59         LOG.debug("Locking {}", lockName);
60         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
61         Lock lockData = LockManagerUtils.buildLockData(lockName);
62         try {
63             getLock(lockInstanceIdentifier, lockData);
64             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
65             LOG.debug("Acquired lock {}", lockName);
66             return Futures.immediateFuture(lockRpcBuilder.build());
67         } catch (InterruptedException e) {
68             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
69             LOG.error("Failed to get lock {}", lockName, e);
70             return Futures.immediateFuture(lockRpcBuilder.build());
71         }
72     }
73
74     @Override
75     public Future<RpcResult<Void>> tryLock(TryLockInput input) {
76         String lockName = input.getLockName();
77         LOG.debug("Locking {}", lockName);
78         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
79         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
80                 : LockManagerUtils.convertToTimeUnit(input.getTimeUnit());
81         waitTime = timeUnit.toMillis(waitTime);
82         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
83         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
84         Lock lockData = LockManagerUtils.buildLockData(lockName);
85
86         RpcResultBuilder<Void> lockRpcBuilder;
87         try {
88             if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
89                 lockRpcBuilder = RpcResultBuilder.success();
90                 LOG.debug("Acquired lock {}", lockName);
91             } else {
92                 lockRpcBuilder = RpcResultBuilder.failed();
93                 LOG.error("Failed to get lock {}", lockName);
94             }
95         } catch (InterruptedException e) {
96             lockRpcBuilder = RpcResultBuilder.failed();
97             LOG.error("Failed to get lock {}", lockName, e);
98         }
99         return Futures.immediateFuture(lockRpcBuilder.build());
100     }
101
102     @Override
103     public Future<RpcResult<Void>> unlock(UnlockInput input) {
104         String lockName = input.getLockName();
105         LOG.debug("Unlocking {}", lockName);
106         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
107         unlock(lockName, lockInstanceIdentifier);
108         RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
109         return Futures.immediateFuture(lockRpcBuilder.build());
110     }
111
112     private void unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier) {
113         ReadWriteTransaction tx = broker.newReadWriteTransaction();
114         try {
115             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
116             if (!result.isPresent()) {
117                 LOG.debug("{} is already unlocked", lockName);
118                 tx.cancel();
119             } else {
120                 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
121                 tx.submit().get();
122             }
123         } catch (InterruptedException | ExecutionException e) {
124             LOG.error("In unlock unable to unlock: {}. Reason :", lockName, e);
125         }
126     }
127
128     public CompletableFuture<Void> getSynchronizerForLock(String lockName) {
129         return lockSynchronizerMap.get(lockName);
130     }
131
132     /**
133      * Try to acquire lock indefinitely until it is successful.
134      */
135     private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
136             throws InterruptedException {
137         // Count from 1 to provide human-comprehensible messages
138         String lockName = lockData.getLockName();
139         for (int retry = 1;; retry++) {
140             try {
141                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
142                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
143                     return;
144                 } else {
145                     if (retry >= DEFAULT_NUMBER_LOCKING_ATTEMPS) {
146                         LOG.debug("Already locked for {} after waiting {}ms, try {}",
147                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
148                     } else {
149                         LOG.warn("Already locked for {} after waiting {}ms, try {}",
150                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
151                     }
152                 }
153             } catch (ExecutionException e) {
154                 LOG.error("Unable to acquire lock for {}, try {}", lockName, retry);
155             }
156             CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
157             if (future != null) {
158                 try {
159                     // Making this as timed get to avoid any missing signal for lock remove notifications
160                     // in LockListener (which does the futue.complete())
161                     future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
162                 } catch (InterruptedException | ExecutionException e) {
163                     LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
164                 } catch (TimeoutException e) {
165                     LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
166                 }
167                 lockSynchronizerMap.remove(lockName);
168             }
169         }
170     }
171
172     /**
173      * Try to acquire lock for mentioned retryCount. Returns true if
174      * successfully acquired lock.
175      */
176     private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
177             throws InterruptedException {
178         // Count from 1 to provide human-comprehensible messages
179         String lockName = lockData.getLockName();
180         for (int retry = 1; retry <= retryCount; retry++) {
181             try {
182                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
183                     return true;
184                 } else {
185                     LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
186                             DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
187                 }
188             } catch (ExecutionException e) {
189                 LOG.error("Unable to acquire lock for {}, try {} of {}", lockName, retry,
190                         retryCount);
191             }
192             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
193         }
194         return false;
195     }
196
197     /**
198      * Read and write the lock immediately if available. Returns true if
199      * successfully locked.
200      */
201     private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
202             throws InterruptedException, ExecutionException {
203         String lockName = lockData.getLockName();
204         synchronized (lockName.intern()) {
205             ReadWriteTransaction tx = broker.newReadWriteTransaction();
206             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
207             if (!result.isPresent()) {
208                 LOG.debug("Writing lock lockData {}", lockData);
209                 tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);
210                 tx.submit().get();
211                 return true;
212             } else {
213                 tx.cancel();
214                 return false;
215             }
216         }
217     }
218 }