*/
package org.opendaylight.transportpce.common.network;
-import com.google.common.base.Optional;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
+ justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
public class RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
private final DataBroker dataBroker;
- private ReadWriteTransaction rwTx;
- private ReadOnlyTransaction readTx;
- private ReentrantReadWriteLock lock;
-
+ private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
+ private final Lock readL = rwL.readLock();
+ private final Lock writeL = rwL.writeLock();
+ private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
public RequestProcessor(DataBroker dataBroker) {
- this.dataBroker = dataBroker;
- rwTx = dataBroker.newReadWriteTransaction();
- readTx = dataBroker.newReadOnlyTransaction();
- lock = new ReentrantReadWriteLock();
+ this.dataBroker = requireNonNull(dataBroker);
LOG.info("RequestProcessor instantiated");
-
}
- public <T extends DataObject> ListenableFuture<Optional<T>>
- read(LogicalDatastoreType store,InstanceIdentifier<T> path) {
-
- ListenableFuture<Optional<T>> result = null;
- acquireReadLock();
- LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
- result = rwTx.read(store, path);
-
- releaseReadLock();
- return result;
+ public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
+ InstanceIdentifier<T> path) {
+ ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
+ String thread = Thread.currentThread().getName();
+ readL.lock();
+ LOG.debug("read locked {} by {}", store, thread);
+ try {
+ return readTx.read(store, path);
+ }
+ finally {
+ readTx.close();
+ readL.unlock();
+ LOG.debug("read after unlock - {}", thread);
+ }
}
public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
-
- acquireLock();
- LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
- rwTx.delete(store, path);
- }
-
- public <T extends DataObject> void put(LogicalDatastoreType store,
- InstanceIdentifier<T> path, T data, boolean createMissingParents) {
-
- acquireLock();
- LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
- rwTx.put(store, path, data, createMissingParents);
- }
-
- public <T extends DataObject> void put(LogicalDatastoreType store,
- InstanceIdentifier<T> path, T data) {
-
- acquireLock();
- LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
- rwTx.put(store, path, data);
- }
-
-
- public <T extends DataObject> void merge(LogicalDatastoreType store,
- InstanceIdentifier<T> path, T data, boolean createMissingParents) {
-
- acquireLock();
- LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
- rwTx.merge(store, path, data, createMissingParents);
- }
-
- public <T extends DataObject> void merge(LogicalDatastoreType store,
- InstanceIdentifier<T> path, T data) {
-
- acquireLock();
- LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
- rwTx.merge(store, path, data);
- }
-
- public FluentFuture<? extends @NonNull CommitInfo> commit() {
- acquireLock();
- FluentFuture<? extends @NonNull CommitInfo> future = null;
- future = rwTx.commit();
- releaseLock();
- resetRwTx();
- return future;
- }
-
- public void close() {
- releaseLock();
- }
-
- private void acquireLock() {
- if (!lock.writeLock().isHeldByCurrentThread()) {
- lock.writeLock().lock();
- LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
- LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
- rwTx = resetRwTx();
- } else {
- LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+ String thread = Thread.currentThread().getName();
+ LOG.debug("delete - store, thread = {} - {}", store, thread);
+ writeL.lock();
+ LOG.debug("delete locked by {}", thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).delete(store, path);
+ }
+ finally {
+ LOG.debug("delete before unlock - {}", thread);
+ writeL.unlock();
+ LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
+ LOG.debug("delete after unlock2 - {}", thread);
}
}
- private void acquireReadLock() {
- if (lock.getReadHoldCount() > 0) {
- LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
- } else {
- lock.readLock().lock();
- rwTx = resetRwTx();
- LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
+ public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("put locked {} by {}", store, thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).put(store, path, data);
+ }
+ finally {
+ writeL.unlock();
+ LOG.debug("put after unlock - {}", thread);
}
}
- private void releaseLock() {
- if (lock.writeLock().isHeldByCurrentThread()) {
- LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
- lock.writeLock().unlock();
+ public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("merge locked {} by {}", store, thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).merge(store, path, data);
+ }
+ finally {
+ writeL.unlock();
+ LOG.debug("merge after unlock - {}", thread);
}
}
- private void releaseReadLock() {
- LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
- lock.readLock().unlock();
+ public FluentFuture<? extends @NonNull CommitInfo> commit() {
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("commit locked by {}", thread);
+ try {
+ if (writeTrMap.containsKey(thread)) {
+ return writeTrMap.get(thread).commit();
+ } else {
+ LOG.warn("No write transaction available for thread {}", thread);
+ return FluentFutures.immediateNullFluentFuture();
+ }
+ }
+ finally {
+ writeTrMap.remove(thread);
+ writeL.unlock();
+ LOG.debug("commit after unlock - {}", thread);
+ }
}
- private ReadWriteTransaction resetRwTx() {
- LOG.info("Resetting the read write transaction .....");
- rwTx = dataBroker.newReadWriteTransaction();
- return rwTx;
+ /**
+ * Return the dataBroker related to RequestProcessor.
+ * @return the dataBroker
+ */
+ public DataBroker getDataBroker() {
+ return dataBroker;
}
}