1abf602e488d466975b2b3070949f3e95799e703
[genius.git] / lockmanager / lockmanager-impl / src / main / java / org / opendaylight / lockmanager / LockManager.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lockmanager;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockInput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.TryLockInput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.UnlockInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.locks.Lock;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import javax.inject.Inject;
33 import javax.inject.Singleton;
34
35 @Singleton
36 public class LockManager implements LockManagerService {
37     private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
38
39     private static final int DEFAULT_RETRY_COUNT = 3;
40     private static final int DEFAULT_WAIT_TIME_IN_MILLIS = 1000;
41
42     private final DataBroker broker;
43
44     @Inject
45     public LockManager(final DataBroker db) {
46         broker = db;
47     }
48
49     @Override
50     public Future<RpcResult<Void>> lock(LockInput input) {
51         String lockName = input.getLockName();
52         LOG.info("Locking {}" , lockName);
53         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
54         Lock lockData = LockManagerUtils.buildLockData(lockName);
55         try {
56             getLock(lockInstanceIdentifier, lockData);
57             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
58             LOG.info("Acquired lock {}" , lockName);
59             return Futures.immediateFuture(lockRpcBuilder.build());
60         } catch (InterruptedException e) {
61             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
62             LOG.info("Failed to get lock {}" , lockName);
63             return Futures.immediateFuture(lockRpcBuilder.build());
64         }
65     }
66
67     @Override
68     public Future<RpcResult<Void>> tryLock(TryLockInput input) {
69         String lockName = input.getLockName();
70         LOG.info("Locking {}" , lockName);
71         long waitTime = input.getTime() == null ? DEFAULT_WAIT_TIME_IN_MILLIS * DEFAULT_RETRY_COUNT : input.getTime();
72         TimeUnit timeUnit = input.getTimeUnit() == null ? TimeUnit.MILLISECONDS
73                 : LockManagerUtils.convertToTimeUnit(input.getTimeUnit());
74         waitTime = timeUnit.toMillis(waitTime);
75         long retryCount = waitTime / DEFAULT_WAIT_TIME_IN_MILLIS;
76         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
77         Lock lockData = LockManagerUtils.buildLockData(lockName);
78         try {
79             if (getLock(lockInstanceIdentifier, lockData, retryCount)) {
80                 RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
81                 LOG.info("Acquired lock {}" , lockName);
82                 return Futures.immediateFuture(lockRpcBuilder.build());
83             }
84             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
85             LOG.info("Failed to get lock {}" , lockName);
86             return Futures.immediateFuture(lockRpcBuilder.build());
87         } catch (Exception e) {
88             RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.failed();
89             LOG.info("Failed to get lock {}" , lockName);
90             return Futures.immediateFuture(lockRpcBuilder.build());
91         }
92     }
93
94     @Override
95     public Future<RpcResult<Void>> unlock(UnlockInput input) {
96         String lockName = input.getLockName();
97         LOG.info("Unlocking {}" , lockName);
98         InstanceIdentifier<Lock> lockInstanceIdentifier = LockManagerUtils.getLockInstanceIdentifier(lockName);
99         unlock(lockName, lockInstanceIdentifier);
100         RpcResultBuilder<Void> lockRpcBuilder = RpcResultBuilder.success();
101         return Futures.immediateFuture(lockRpcBuilder.build());
102     }
103
104     /**
105      * Try to acquire lock indefinitely until it is successful.
106      */
107     private void getLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
108             throws InterruptedException {
109         // Count from 1 to provide human-comprehensible messages
110         for (int retry = 1; ; retry++) {
111             try {
112                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
113                     return;
114                 } else {
115                     LOG.debug("Already locked after waiting {}ms, try {}", DEFAULT_WAIT_TIME_IN_MILLIS, retry);
116                 }
117             } catch (ExecutionException e) {
118                 LOG.error("Unable to acquire lock, try {}", retry, e);
119             }
120             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
121         }
122     }
123
124     /**
125      * Try to acquire lock for mentioned retryCount. Returns true if successfully acquired lock.
126      */
127     private boolean getLock(InstanceIdentifier<Lock> lockInstanceIdentifier,
128             Lock lockData, long retryCount) throws InterruptedException {
129         // Count from 1 to provide human-comprehensible messages
130         for (int retry = 1; retry <= retryCount; retry++) {
131             try {
132                 if (readWriteLock(lockInstanceIdentifier, lockData)) {
133                     return true;
134                 } else {
135                     LOG.debug("Already locked after waiting {}ms, try {} of {}", DEFAULT_WAIT_TIME_IN_MILLIS, retry,
136                             retryCount);
137                 }
138             } catch (ExecutionException e) {
139                 LOG.error("Unable to acquire lock, try {} of {}", retry, retryCount, e);
140             }
141             Thread.sleep(DEFAULT_WAIT_TIME_IN_MILLIS);
142         }
143         return false;
144     }
145
146     /**
147      * Read and write the lock immediately if available. Returns true if successfully locked.
148      */
149     private boolean readWriteLock(final InstanceIdentifier<Lock> lockInstanceIdentifier, final Lock lockData)
150             throws InterruptedException, ExecutionException {
151         ReadWriteTransaction tx = broker.newReadWriteTransaction();
152         Optional<Lock> result = Optional.absent();
153         result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
154         if (!result.isPresent()) {
155             tx.put(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier, lockData, true);
156             CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
157             futures.get();
158             return true;
159         }
160         if (result.get().getLockOwner() == Thread.currentThread().getName()) {
161             return true;
162         }
163         return false;
164     }
165
166     private void unlock(final String lockName, final InstanceIdentifier<Lock> lockInstanceIdentifier) {
167         ReadWriteTransaction tx = broker.newReadWriteTransaction();
168         Optional<Lock> result = Optional.absent();
169         try {
170             result = tx.read(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier).get();
171             if (!result.isPresent()) {
172                 LOG.info("{} is already unlocked", lockName);
173                 return;
174             }
175             tx.delete(LogicalDatastoreType.OPERATIONAL, lockInstanceIdentifier);
176             CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
177             futures.get();
178         } catch (Exception e) {
179             LOG.error("In unlock unable to unlock due to {}", e.getMessage());
180         }
181     }
182
183
184 }