GENIUS-86 : LockManager fixes
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / impl / LockManagerServiceImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.lockmanager.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
13
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
32 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
33 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 @Singleton
40 public class LockManagerServiceImpl implements LockManagerService {
41
42     private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
43     private static final int DEFAULT_RETRY_COUNT = 3;
44     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
45
46     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
47             new ConcurrentHashMap<>();
48
49     private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
50
51     private final DataBroker broker;
52     private final LockManagerUtils lockManagerUtils;
53
54     @Inject
55     public LockManagerServiceImpl(final DataBroker dataBroker, final LockManagerUtils lockManagerUtils) {
56         this.broker = dataBroker;
57         this.lockManagerUtils = lockManagerUtils;
58     }
59
60     @Override
61     public Future<RpcResult<Void>> lock(LockInput input) {
62         String lockName = input.getLockName();
63         String owner = lockManagerUtils.getUniqueID();
64         LOG.debug("Locking {}, owner {}" , lockName, owner);
65         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
66         Lock lockData = lockManagerUtils.buildLock(lockName, owner);
67         try {
68             getLock(lockInstanceIdentifier, lockData);
69             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
70             LOG.debug("Acquired lock {} by owner {}" , lockName, owner);
71             return Futures.immediateFuture(lockRpcBuilder.build());
72         } catch (InterruptedException e) {
73             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
74             LOG.error("Failed to get lock {} for {}", lockName, owner, e);
75             return Futures.immediateFuture(lockRpcBuilder.build());
76         }
77     }
78
79     @Override
80     public Future<RpcResult<Void>> tryLock(TryLockInput input) {
81         String lockName = input.getLockName();
82         String owner = lockManagerUtils.getUniqueID();
83         LOG.debug("Locking {}, owner {}" , lockName, owner);
84         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
85         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
86                 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
87         waitTime = timeUnit.toMillis(waitTime);
88         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
89         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
90         Lock lockData = lockManagerUtils.buildLock(lockName, owner);
91
92         RpcResultBuilder<Void> lockRpcBuilder;
93         try {
94             if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
95                 lockRpcBuilder = RpcResultBuilder.success();
96                 LOG.debug("Acquired lock {} by owner {}", lockName, owner);
97             } else {
98                 lockRpcBuilder = RpcResultBuilder.failed();
99                 LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
100             }
101         } catch (InterruptedException e) {
102             lockRpcBuilder = RpcResultBuilder.failed();
103             LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
104         }
105         return Futures.immediateFuture(lockRpcBuilder.build());
106     }
107
108     @Override
109     public Future<RpcResult<Void>> unlock(UnlockInput input) {
110         String lockName = input.getLockName();
111         LOG.debug("Unlocking {}", lockName);
112         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
113         RpcResultBuilder<Void> lockRpcBuilder = unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT);
114         return Futures.immediateFuture(lockRpcBuilder.build());
115     }
116
117     private RpcResultBuilder<Void> unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier,
118             int retry) {
119         RpcResultBuilder<Void> lockRpcBuilder;
120         try {
121             ReadWriteTransaction tx = broker.newReadWriteTransaction();
122             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
123             if (!result.isPresent()) {
124                 LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
125                 tx.cancel();
126             } else {
127                 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
128                 tx.submit().get();
129             }
130             lockRpcBuilder = RpcResultBuilder.success();
131         } catch (InterruptedException | ExecutionException e) {
132             LOG.warn("In unlock unable to unlock {} due to {}, retryCount {}", lockName, e.getMessage(), retry);
133             // try to unlock again
134             if (retry > 0) {
135                 lockRpcBuilder = unlock(lockName, lockInstanceIdentifier, --retry);
136             } else {
137                 lockRpcBuilder = RpcResultBuilder.failed();
138                 lockRpcBuilder.withError(ErrorType.APPLICATION, "unlock() failed: " + lockName, e);
139             }
140         }
141         return lockRpcBuilder;
142     }
143
144     public CompletableFuture<Void> getSynchronizerForLock(String lockName) {
145         return lockSynchronizerMap.get(lockName);
146     }
147
148     /**
149      * Try to acquire lock indefinitely until it is successful.
150      */
151     private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
152             throws InterruptedException {
153         // Count from 1 to provide human-comprehensible messages
154         String lockName = lockData.getLockName();
155         for (int retry = 1;; retry++) {
156             try {
157                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
158                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
159                     return;
160                 } else {
161                     if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
162                         LOG.debug("Already locked for {} after waiting {}ms, try {}",
163                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
164                     } else {
165                         LOG.warn("Already locked for {} after waiting {}ms, try {}",
166                                 lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
167                     }
168                 }
169             } catch (ExecutionException e) {
170                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
171             }
172             CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
173             if (future != null) {
174                 try {
175                     // Making this as timed get to avoid any missing signal for lock remove notifications
176                     // in LockListener (which does the future.complete())
177                     future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
178                 } catch (InterruptedException | ExecutionException e) {
179                     LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
180                 } catch (TimeoutException e) {
181                     LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
182                 }
183                 lockSynchronizerMap.remove(lockName);
184             }
185         }
186     }
187
188     /**
189      * Try to acquire lock for mentioned retryCount. Returns true if
190      * successfully acquired lock.
191      */
192     private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier, Lock lockData, long retryCount)
193             throws InterruptedException {
194         // Count from 1 to provide human-comprehensible messages
195         String lockName = lockData.getLockName();
196         for (int retry = 1; retry <= retryCount; retry++) {
197             try {
198                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
199                     return true;
200                 } else {
201                     LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName,
202                             DEFAULT_WAIT_TIME_IN_MILLIS, retry, retryCount);
203                 }
204             } catch (ExecutionException e) {
205                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
206             }
207             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
208         }
209         return false;
210     }
211
212     private void logUnlessCauseIsOptimisticLockFailedException(String name, int retry, ExecutionException exception) {
213         // Log anything else than OptimisticLockFailedException with level error.
214         // Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
215         //           and this class is explicitly designed to retry obtained a lock in case of an
216         //           OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
217         // TODO This class may be completely reviewed in the future to work entirely differently;
218         //      e.g. using an EntityOwnershipService, as proposed in Bug 8224.
219         if (!(exception.getCause() instanceof OptimisticLockFailedException)) {
220             LOG.error("Unable to acquire lock for {}, try {}", name, retry, exception);
221         }
222     }
223
224     /**
225      * Read and write the lock immediately if available. Returns true if
226      * successfully locked.
227      */
228     private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
229             throws InterruptedException, ExecutionException {
230         String lockName = lockData.getLockName();
231         synchronized (lockName.intern()) {
232             ReadWriteTransaction tx = broker.newReadWriteTransaction();
233             Optional<Lock> result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
234             if (!result.isPresent()) {
235                 LOG.debug("Writing lock lockData {}", lockData);
236                 tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);
237                 tx.submit().get();
238                 return true;
239             } else {
240                 String lockDataOwner = result.get().getLockOwner();
241                 String currentOwner = lockData.getLockOwner();
242                 if (currentOwner.equals(lockDataOwner)) {
243                     return true;
244                 }
245             }
246             tx.cancel();
247             return false;
248         }
249     }
250 }