Use mdsal-binding-util to get ManagedNewTransactionRunner
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / impl / LockManagerServiceImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.lockmanager.impl;
9
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Objects;
15 import java.util.Optional;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.apache.aries.blueprint.annotation.service.Reference;
26 import org.opendaylight.genius.utils.JvmGlobalLocks;
27 import org.opendaylight.infrautils.utils.concurrent.Executors;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.util.Datastore;
30 import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
31 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
32 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
33 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
45 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 @Singleton
53 public class LockManagerServiceImpl implements LockManagerService {
54     private static final FluentFuture<LockOutput> IMMEDIATE_LOCK = FluentFutures.immediateFluentFuture(
55         new LockOutputBuilder().build());
56     private static final UnlockOutput UNLOCK_OUTPUT = new UnlockOutputBuilder().build();
57     private static final ListenableFuture<RpcResult<TryLockOutput>> FAILED_TRYLOCK =
58             RpcResultBuilder.<TryLockOutput>failed().buildFuture();
59     private static final ListenableFuture<RpcResult<TryLockOutput>> SUCCESSFUL_TRYLOCK =
60             RpcResultBuilder.success(new TryLockOutputBuilder().build()).buildFuture();
61
62     private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
63     private static final int DEFAULT_RETRY_COUNT = 10;
64     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
65
66     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
67             new ConcurrentHashMap<>();
68
69     private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
70     //TODO: replace with shared executor service once that is available
71     private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(25,
72             "LockManagerService", LOG);
73
74     private final RetryingManagedNewTransactionRunner txRunner;
75     private final LockManagerUtils lockManagerUtils;
76
77     @Inject
78     public LockManagerServiceImpl(final @Reference DataBroker dataBroker, final LockManagerUtils lockManagerUtils) {
79         this.lockManagerUtils = lockManagerUtils;
80         this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
81     }
82
83     @Override
84     public ListenableFuture<RpcResult<LockOutput>> lock(LockInput input) {
85         final Lock lockData = lockManagerUtils.buildLock(input.getLockName(), lockManagerUtils.getUniqueID());
86         return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
87             return getLock(lockData);
88         }).build();
89     }
90
91     @Override
92     public ListenableFuture<RpcResult<TryLockOutput>> tryLock(TryLockInput input) {
93         String lockName = input.getLockName();
94         String owner = lockManagerUtils.getUniqueID();
95         LOG.debug("Locking {}, owner {}" , lockName, owner);
96         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT
97                                                 : input.getTime().toJava();
98         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
99                 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
100         waitTime = timeUnit.toMillis(waitTime);
101         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
102         Lock lockData = lockManagerUtils.buildLock(lockName, owner);
103
104         final boolean success;
105         try {
106             success = getLock(lockData, retryCount);
107         } catch (InterruptedException e) {
108             LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
109             return FAILED_TRYLOCK;
110         }
111
112         if (success) {
113             LOG.debug("Acquired lock {} by owner {}", lockName, owner);
114             return SUCCESSFUL_TRYLOCK;
115         }
116
117         LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
118         return FAILED_TRYLOCK;
119     }
120
121     @Override
122     public ListenableFuture<RpcResult<UnlockOutput>> unlock(UnlockInput input) {
123         String lockName = input.getLockName();
124         LOG.debug("Unlocking {}", lockName);
125         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
126         return FutureRpcResults.fromListenableFuture(LOG, input,
127             () -> Futures.transform(unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT),
128                 unused -> UNLOCK_OUTPUT, MoreExecutors.directExecutor())).build();
129     }
130
131     private ListenableFuture<?> unlock(final String lockName,
132         final InstanceIdentifier<Lock> lockInstanceIdentifier, final int retry) {
133         ListenableFuture<?> future = txRunner.callWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
134             Boolean result = tx.exists(lockInstanceIdentifier).get();
135             if (!result) {
136                 LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
137             } else {
138                 tx.delete(lockInstanceIdentifier);
139             }
140         });
141         return Futures.catchingAsync(future, Exception.class, exception -> {
142             LOG.error("in unlock unable to unlock {} due to {}, try {} of {}", lockName,
143                 exception.getMessage(), DEFAULT_RETRY_COUNT - retry + 1, DEFAULT_RETRY_COUNT);
144             if (retry - 1 > 0) {
145                 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
146                 return (ListenableFuture) unlock(lockName, lockInstanceIdentifier, retry - 1);
147             } else {
148                 throw exception;
149             }
150         }, EXECUTOR_SERVICE);
151     }
152
153     void removeLock(final Lock removedLock) {
154         final String lockName = removedLock.getLockName();
155         LOG.debug("Received remove for lock {} : {}", lockName, removedLock);
156         CompletableFuture<Void> lock = lockSynchronizerMap.get(lockName);
157         if (lock != null) {
158             // FindBugs flags a false violation here - "passes a null value as the parameter of a method which must be
159             // non-null. Either this parameter has been explicitly marked as @NonNull, or analysis has determined that
160             // this parameter is always dereferenced.". However neither is true. The type param is Void so you have to
161             // pas null.
162             lock.complete(null);
163         }
164     }
165
166     /**
167      * Try to acquire lock indefinitely until it is successful.
168      */
169     private ListenableFuture<LockOutput> getLock(final Lock lockData)
170             throws InterruptedException {
171         // Count from 1 to provide human-comprehensible messages
172         String lockName = lockData.getLockName();
173         for (int retry = 1;; retry++) {
174             try {
175                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
176                 if (readWriteLock(lockData)) {
177                     return IMMEDIATE_LOCK;
178                 }
179
180                 if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
181                     LOG.debug("Already locked for {} after waiting {}ms, try {}",
182                         lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
183                 } else {
184                     LOG.warn("Already locked for {} after waiting {}ms, try {}",
185                         lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
186                 }
187             } catch (ExecutionException e) {
188                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
189                 if (!(e.getCause() instanceof OptimisticLockFailedException
190                     || e.getCause() instanceof DataStoreUnavailableException)) {
191                     return Futures.immediateFailedFuture(e.getCause());
192                 }
193             }
194             CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
195             if (future != null) {
196                 try {
197                     // Making this as timed get to avoid any missing signal for lock remove notifications
198                     // in LockListener (which does the future.complete())
199                     future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
200                 } catch (InterruptedException | ExecutionException e) {
201                     LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
202                 } catch (TimeoutException e) {
203                     LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
204                 }
205                 lockSynchronizerMap.remove(lockName);
206             }
207         }
208     }
209
210     /**
211      * Try to acquire lock for mentioned retryCount. Returns true if
212      * successfully acquired lock.
213      */
214     private boolean getLock(Lock lockData, long retryCount) throws InterruptedException {
215         // Count from 1 to provide human-comprehensible messages
216         String lockName = lockData.getLockName();
217         for (int retry = 1; retry <= retryCount; retry++) {
218             try {
219                 if (readWriteLock(lockData)) {
220                     return true;
221                 }
222             } catch (ExecutionException e) {
223                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
224             }
225
226             LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName, DEFAULT_WAIT_TIME_IN_MILLIS,
227                 retry, retryCount);
228             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
229         }
230         return false;
231     }
232
233     private static void logUnlessCauseIsOptimisticLockFailedException(String name, int retry,
234             ExecutionException exception) {
235         // Log anything else than OptimisticLockFailedException with level error.
236         // Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
237         //           and this class is explicitly designed to retry obtaining a lock in case of an
238         //           OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
239         // TODO This class may be completely reviewed in the future to work entirely differently;
240         //      e.g. using an EntityOwnershipService, as proposed in Bug 8224.
241         if (!(exception.getCause() instanceof OptimisticLockFailedException)) {
242             LOG.error("Unable to acquire lock for {}, try {}", name, retry, exception);
243         }
244     }
245
246     /**
247      * Read and write the lock immediately if available. Returns true if
248      * successfully locked.
249      */
250     private boolean readWriteLock(final Lock lockData)
251             throws InterruptedException, ExecutionException {
252         // FIXME: Since netvirt is currently also locking on strings, we need to ensure those places do not synchronize
253         //        with us before switching to .getLockFor()
254         final ReentrantLock lock = JvmGlobalLocks.getLockForString(lockData.getLockName());
255         lock.lock();
256         try {
257             return txRunner.applyWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
258                 final InstanceIdentifier<Lock> lockInstanceIdentifier =
259                         LockManagerUtils.getLockInstanceIdentifier(lockData.key());
260                 Optional<Lock> result = tx.read(lockInstanceIdentifier).get();
261                 if (!result.isPresent()) {
262                     LOG.debug("Writing lock lockData {}", lockData);
263                     tx.put(lockInstanceIdentifier, lockData);
264                     return true;
265                 }
266
267                 String lockDataOwner = result.get().getLockOwner();
268                 String currentOwner = lockData.getLockOwner();
269                 return Objects.equals(currentOwner, lockDataOwner);
270             }).get();
271         } finally {
272             lock.unlock();
273         }
274     }
275 }