Declare SuppressFBWarnings in imports
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / network / RequestProcessor.java
index 9ce82ed1717e4947e4be4b815e867fe05b74afc6..8a4780ca104bb107ab4ede0c0f88cb070e966322 100644 (file)
  */
 package org.opendaylight.transportpce.common.network;
 
-import com.google.common.base.Optional;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
+    justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
 public class RequestProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
 
     private final DataBroker dataBroker;
-    private ReadWriteTransaction rwTx;
-    private ReadOnlyTransaction readTx;
-    private ReentrantReadWriteLock lock;
-
+    private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
+    private final Lock readL = rwL.readLock();
+    private final Lock writeL = rwL.writeLock();
+    private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
 
 
     public RequestProcessor(DataBroker dataBroker) {
-        this.dataBroker = dataBroker;
-        rwTx = dataBroker.newReadWriteTransaction();
-        readTx = dataBroker.newReadOnlyTransaction();
-        lock = new ReentrantReadWriteLock();
+        this.dataBroker = requireNonNull(dataBroker);
         LOG.info("RequestProcessor instantiated");
-
     }
 
-    public <T extends DataObject> ListenableFuture<Optional<T>>
-         read(LogicalDatastoreType store,InstanceIdentifier<T> path) {
-
-        ListenableFuture<Optional<T>> result = null;
-        acquireReadLock();
-        LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
-        result = rwTx.read(store, path);
-
-        releaseReadLock();
-        return result;
+    public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
+            InstanceIdentifier<T> path) {
+        ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
+        String thread = Thread.currentThread().getName();
+        readL.lock();
+        LOG.debug("read locked {} by {}", store, thread);
+        try {
+            return readTx.read(store, path);
+        }
+        finally {
+            readTx.close();
+            readL.unlock();
+            LOG.debug("read after unlock - {}", thread);
+        }
     }
 
     public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
-
-        acquireLock();
-        LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.delete(store, path);
-    }
-
-    public <T extends DataObject> void put(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data, boolean createMissingParents) {
-
-        acquireLock();
-        LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.put(store, path, data, createMissingParents);
-    }
-
-    public <T extends DataObject> void put(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.put(store, path, data);
-    }
-
-
-    public <T extends DataObject> void merge(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data, boolean createMissingParents) {
-
-        acquireLock();
-        LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.merge(store, path, data, createMissingParents);
-    }
-
-    public <T extends DataObject> void merge(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.merge(store, path, data);
-    }
-
-    public FluentFuture<? extends @NonNull CommitInfo> commit() {
-        acquireLock();
-        FluentFuture<? extends @NonNull CommitInfo> future = null;
-        future = rwTx.commit();
-        releaseLock();
-        resetRwTx();
-        return future;
-    }
-
-    public void close() {
-        releaseLock();
-    }
-
-    private void acquireLock() {
-        if (!lock.writeLock().isHeldByCurrentThread()) {
-            lock.writeLock().lock();
-            LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
-            LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
-            rwTx = resetRwTx();
-        } else {
-            LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+        String thread = Thread.currentThread().getName();
+        LOG.debug("delete - store, thread = {} - {}", store, thread);
+        writeL.lock();
+        LOG.debug("delete locked by {}", thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).delete(store, path);
+        }
+        finally {
+            LOG.debug("delete before unlock - {}", thread);
+            writeL.unlock();
+            LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
+            LOG.debug("delete after unlock2 - {}", thread);
         }
     }
 
-    private void acquireReadLock() {
-        if (lock.getReadHoldCount() > 0) {
-            LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
-        } else {
-            lock.readLock().lock();
-            rwTx = resetRwTx();
-            LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
+    public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("put locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).put(store, path, data);
+        }
+        finally {
+            writeL.unlock();
+            LOG.debug("put after unlock - {}", thread);
         }
     }
 
-    private void releaseLock() {
-        if (lock.writeLock().isHeldByCurrentThread()) {
-            LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
-            lock.writeLock().unlock();
+    public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("merge locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).merge(store, path, data);
+        }
+        finally {
+            writeL.unlock();
+            LOG.debug("merge after unlock - {}", thread);
         }
     }
 
-    private void releaseReadLock() {
-        LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
-        lock.readLock().unlock();
+    public FluentFuture<? extends @NonNull CommitInfo> commit() {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("commit locked by {}", thread);
+        try {
+            if (writeTrMap.containsKey(thread)) {
+                return writeTrMap.get(thread).commit();
+            } else {
+                LOG.warn("No write transaction available for thread {}", thread);
+                return FluentFutures.immediateNullFluentFuture();
+            }
+        }
+        finally {
+            writeTrMap.remove(thread);
+            writeL.unlock();
+            LOG.debug("commit after unlock - {}", thread);
+        }
     }
 
-    private ReadWriteTransaction resetRwTx() {
-        LOG.info("Resetting the read write transaction .....");
-        rwTx = dataBroker.newReadWriteTransaction();
-        return rwTx;
+    /**
+     * Return the dataBroker related to RequestProcessor.
+     * @return the dataBroker
+     */
+    public DataBroker getDataBroker() {
+        return dataBroker;
     }
 }