MDSAL-API Migration
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / genius / lockmanager / impl / LockManagerServiceImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.lockmanager.impl;
9
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Objects;
15 import java.util.Optional;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.ReentrantLock;
23 import javax.inject.Inject;
24 import javax.inject.Singleton;
25 import org.apache.aries.blueprint.annotation.service.Reference;
26 import org.opendaylight.genius.infra.Datastore;
27 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
28 import org.opendaylight.genius.utils.JvmGlobalLocks;
29 import org.opendaylight.infrautils.utils.concurrent.Executors;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
34 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockOutputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockOutputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
46 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 @Singleton
54 public class LockManagerServiceImpl implements LockManagerService {
55     private static final FluentFuture<LockOutput> IMMEDIATE_LOCK = FluentFutures.immediateFluentFuture(
56         new LockOutputBuilder().build());
57     private static final UnlockOutput UNLOCK_OUTPUT = new UnlockOutputBuilder().build();
58     private static final ListenableFuture<RpcResult<TryLockOutput>> FAILED_TRYLOCK =
59             RpcResultBuilder.<TryLockOutput>failed().buildFuture();
60     private static final ListenableFuture<RpcResult<TryLockOutput>> SUCCESSFUL_TRYLOCK =
61             RpcResultBuilder.success(new TryLockOutputBuilder().build()).buildFuture();
62
63     private static final int DEFAULT_NUMBER_LOCKING_ATTEMPS = 30;
64     private static final int DEFAULT_RETRY_COUNT = 10;
65     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
66
67     private final ConcurrentHashMap<String, CompletableFuture<Void>> lockSynchronizerMap =
68             new ConcurrentHashMap<>();
69
70     private static final Logger LOG = LoggerFactory.getLogger(LockManagerServiceImpl.class);
71     //TODO: replace with shared executor service once that is available
72     private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(25,
73             "LockManagerService", LOG);
74
75     private final RetryingManagedNewTransactionRunner txRunner;
76     private final LockManagerUtils lockManagerUtils;
77
78     @Inject
79     public LockManagerServiceImpl(final @Reference DataBroker dataBroker, final LockManagerUtils lockManagerUtils) {
80         this.lockManagerUtils = lockManagerUtils;
81         this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
82     }
83
84     @Override
85     public ListenableFuture<RpcResult<LockOutput>> lock(LockInput input) {
86         final Lock lockData = lockManagerUtils.buildLock(input.getLockName(), lockManagerUtils.getUniqueID());
87         return FutureRpcResults.fromListenableFuture(LOG, input, () -> {
88             return getLock(lockData);
89         }).build();
90     }
91
92     @Override
93     public ListenableFuture<RpcResult<TryLockOutput>> tryLock(TryLockInput input) {
94         String lockName = input.getLockName();
95         String owner = lockManagerUtils.getUniqueID();
96         LOG.debug("Locking {}, owner {}" , lockName, owner);
97         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT
98                                                 : input.getTime().toJava();
99         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
100                 : lockManagerUtils.convertToTimeUnit(input.getTimeUnit());
101         waitTime = timeUnit.toMillis(waitTime);
102         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
103         Lock lockData = lockManagerUtils.buildLock(lockName, owner);
104
105         final boolean success;
106         try {
107             success = getLock(lockData, retryCount);
108         } catch (InterruptedException e) {
109             LOG.error("Failed to get lock {} owner {}", lockName, owner, e);
110             return FAILED_TRYLOCK;
111         }
112
113         if (success) {
114             LOG.debug("Acquired lock {} by owner {}", lockName, owner);
115             return SUCCESSFUL_TRYLOCK;
116         }
117
118         LOG.error("Failed to get lock {} owner {} after {} retries", lockName, owner, retryCount);
119         return FAILED_TRYLOCK;
120     }
121
122     @Override
123     public ListenableFuture<RpcResult<UnlockOutput>> unlock(UnlockInput input) {
124         String lockName = input.getLockName();
125         LOG.debug("Unlocking {}", lockName);
126         InstanceIdentifier<Lock> lockInstanceIdentifier = lockManagerUtils.getLockInstanceIdentifier(lockName);
127         return FutureRpcResults.fromListenableFuture(LOG, input,
128             () -> Futures.transform(unlock(lockName, lockInstanceIdentifier, DEFAULT_RETRY_COUNT),
129                 unused -> UNLOCK_OUTPUT, MoreExecutors.directExecutor())).build();
130     }
131
132     private ListenableFuture<Void> unlock(final String lockName,
133         final InstanceIdentifier<Lock> lockInstanceIdentifier, final int retry) {
134         ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
135             Boolean result = tx.exists(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
136             if (!result) {
137                 LOG.debug("unlock ignored, as unnecessary; lock is already unlocked: {}", lockName);
138             } else {
139                 tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
140             }
141         });
142         return Futures.catchingAsync(future, Exception.class, exception -> {
143             LOG.error("in unlock unable to unlock {} due to {}, try {} of {}", lockName,
144                 exception.getMessage(), DEFAULT_RETRY_COUNT - retry + 1, DEFAULT_RETRY_COUNT);
145             if (retry - 1 > 0) {
146                 Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
147                 return unlock(lockName, lockInstanceIdentifier, retry - 1);
148             } else {
149                 throw exception;
150             }
151         }, EXECUTOR_SERVICE);
152     }
153
154     void removeLock(final Lock removedLock) {
155         final String lockName = removedLock.getLockName();
156         LOG.debug("Received remove for lock {} : {}", lockName, removedLock);
157         CompletableFuture<Void> lock = lockSynchronizerMap.get(lockName);
158         if (lock != null) {
159             // FindBugs flags a false violation here - "passes a null value as the parameter of a method which must be
160             // non-null. Either this parameter has been explicitly marked as @NonNull, or analysis has determined that
161             // this parameter is always dereferenced.". However neither is true. The type param is Void so you have to
162             // pas null.
163             lock.complete(null);
164         }
165     }
166
167     /**
168      * Try to acquire lock indefinitely until it is successful.
169      */
170     private ListenableFuture<LockOutput> getLock(final Lock lockData)
171             throws InterruptedException {
172         // Count from 1 to provide human-comprehensible messages
173         String lockName = lockData.getLockName();
174         for (int retry = 1;; retry++) {
175             try {
176                 lockSynchronizerMap.putIfAbsent(lockName, new CompletableFuture<>());
177                 if (readWriteLock(lockData)) {
178                     return IMMEDIATE_LOCK;
179                 }
180
181                 if (retry < DEFAULT_NUMBER_LOCKING_ATTEMPS) {
182                     LOG.debug("Already locked for {} after waiting {}ms, try {}",
183                         lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
184                 } else {
185                     LOG.warn("Already locked for {} after waiting {}ms, try {}",
186                         lockName, DEFAULT_WAIT_TIME_IN_MILLIS, retry);
187                 }
188             } catch (ExecutionException e) {
189                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
190                 if (!(e.getCause() instanceof OptimisticLockFailedException
191                     || e.getCause() instanceof DataStoreUnavailableException)) {
192                     return Futures.immediateFailedFuture(e.getCause());
193                 }
194             }
195             CompletableFuture<Void> future = lockSynchronizerMap.get(lockName);
196             if (future != null) {
197                 try {
198                     // Making this as timed get to avoid any missing signal for lock remove notifications
199                     // in LockListener (which does the future.complete())
200                     future.get(DEFAULT_WAIT_TIME_IN_MILLIS, TimeUnit.MILLISECONDS);
201                 } catch (InterruptedException | ExecutionException e) {
202                     LOG.error("Problems in waiting on lock synchronizer {}", lockName, e);
203                 } catch (TimeoutException e) {
204                     LOG.info("Waiting for the lock {} is timed out. retrying again", lockName);
205                 }
206                 lockSynchronizerMap.remove(lockName);
207             }
208         }
209     }
210
211     /**
212      * Try to acquire lock for mentioned retryCount. Returns true if
213      * successfully acquired lock.
214      */
215     private boolean getLock(Lock lockData, long retryCount) throws InterruptedException {
216         // Count from 1 to provide human-comprehensible messages
217         String lockName = lockData.getLockName();
218         for (int retry = 1; retry <= retryCount; retry++) {
219             try {
220                 if (readWriteLock(lockData)) {
221                     return true;
222                 }
223             } catch (ExecutionException e) {
224                 logUnlessCauseIsOptimisticLockFailedException(lockName, retry, e);
225             }
226
227             LOG.debug("Already locked for {} after waiting {}ms, try {} of {}", lockName, DEFAULT_WAIT_TIME_IN_MILLIS,
228                 retry, retryCount);
229             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
230         }
231         return false;
232     }
233
234     private static void logUnlessCauseIsOptimisticLockFailedException(String name, int retry,
235             ExecutionException exception) {
236         // Log anything else than OptimisticLockFailedException with level error.
237         // Bug 8059: We do not log OptimisticLockFailedException, as those are "normal" in the current design,
238         //           and this class is explicitly designed to retry obtaining a lock in case of an
239         //           OptimisticLockFailedException, so we do not flood the log with events in case it's "just" that.
240         // TODO This class may be completely reviewed in the future to work entirely differently;
241         //      e.g. using an EntityOwnershipService, as proposed in Bug 8224.
242         if (!(exception.getCause() instanceof OptimisticLockFailedException)) {
243             LOG.error("Unable to acquire lock for {}, try {}", name, retry, exception);
244         }
245     }
246
247     /**
248      * Read and write the lock immediately if available. Returns true if
249      * successfully locked.
250      */
251     private boolean readWriteLock(final Lock lockData)
252             throws InterruptedException, ExecutionException {
253         // FIXME: Since netvirt is currently also locking on strings, we need to ensure those places do not synchronize
254         //        with us before switching to .getLockFor()
255         final ReentrantLock lock = JvmGlobalLocks.getLockForString(lockData.getLockName());
256         lock.lock();
257         try {
258             return txRunner.applyWithNewReadWriteTransactionAndSubmit(Datastore.OPERATIONAL, tx -> {
259                 final InstanceIdentifier<Lock> lockInstanceIdentifier =
260                         LockManagerUtils.getLockInstanceIdentifier(lockData.key());
261                 Optional<Lock> result = tx.read(lockInstanceIdentifier).get();
262                 if (!result.isPresent()) {
263                     LOG.debug("Writing lock lockData {}", lockData);
264                     tx.put(lockInstanceIdentifier, lockData);
265                     return true;
266                 }
267
268                 String lockDataOwner = result.get().getLockOwner();
269                 String currentOwner = lockData.getLockOwner();
270                 return Objects.equals(currentOwner, lockDataOwner);
271             }).get();
272         } finally {
273             lock.unlock();
274         }
275     }
276 }