Declare SuppressFBWarnings in imports
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / network / RequestProcessor.java
index b58961c8e998d19e4e5f0ca50f6c41c8bff2c89e..8a4780ca104bb107ab4ede0c0f88cb070e966322 100644 (file)
@@ -11,122 +11,126 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
+    justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
 public class RequestProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
 
     private final DataBroker dataBroker;
-    private ReadWriteTransaction rwTx;
-    private ReentrantReadWriteLock lock;
-
+    private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
+    private final Lock readL = rwL.readLock();
+    private final Lock writeL = rwL.writeLock();
+    private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
 
 
     public RequestProcessor(DataBroker dataBroker) {
         this.dataBroker = requireNonNull(dataBroker);
-        rwTx = dataBroker.newReadWriteTransaction();
-        lock = new ReentrantReadWriteLock();
         LOG.info("RequestProcessor instantiated");
-
     }
 
     public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
             InstanceIdentifier<T> path) {
-
-        ListenableFuture<Optional<T>> result = null;
-        acquireReadLock();
-        LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
-        result = rwTx.read(store, path);
-
-        releaseReadLock();
-        return result;
+        ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
+        String thread = Thread.currentThread().getName();
+        readL.lock();
+        LOG.debug("read locked {} by {}", store, thread);
+        try {
+            return readTx.read(store, path);
+        }
+        finally {
+            readTx.close();
+            readL.unlock();
+            LOG.debug("read after unlock - {}", thread);
+        }
     }
 
     public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
-
-        acquireLock();
-        LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.delete(store, path);
+        String thread = Thread.currentThread().getName();
+        LOG.debug("delete - store, thread = {} - {}", store, thread);
+        writeL.lock();
+        LOG.debug("delete locked by {}", thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).delete(store, path);
+        }
+        finally {
+            LOG.debug("delete before unlock - {}", thread);
+            writeL.unlock();
+            LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
+            LOG.debug("delete after unlock2 - {}", thread);
+        }
     }
 
-
-    public <T extends DataObject> void put(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.put(store, path, data);
+    public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("put locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).put(store, path, data);
+        }
+        finally {
+            writeL.unlock();
+            LOG.debug("put after unlock - {}", thread);
+        }
     }
 
-
     public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.merge(store, path, data);
-    }
-
-    public FluentFuture<? extends @NonNull CommitInfo> commit() {
-        acquireLock();
-        FluentFuture<? extends @NonNull CommitInfo> future = null;
-        future = rwTx.commit();
-        releaseLock();
-        resetRwTx();
-        return future;
-    }
-
-    public void close() {
-        releaseLock();
-    }
-
-    private void acquireLock() {
-        if (!lock.writeLock().isHeldByCurrentThread()) {
-            lock.writeLock().lock();
-            LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
-            LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
-            rwTx = resetRwTx();
-        } else {
-            LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("merge locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).merge(store, path, data);
         }
-    }
-
-    private void acquireReadLock() {
-        if (lock.getReadHoldCount() > 0) {
-            LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
-        } else {
-            lock.readLock().lock();
-            rwTx = resetRwTx();
-            LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
+        finally {
+            writeL.unlock();
+            LOG.debug("merge after unlock - {}", thread);
         }
     }
 
-    private void releaseLock() {
-        if (lock.writeLock().isHeldByCurrentThread()) {
-            LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
-            lock.writeLock().unlock();
+    public FluentFuture<? extends @NonNull CommitInfo> commit() {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("commit locked by {}", thread);
+        try {
+            if (writeTrMap.containsKey(thread)) {
+                return writeTrMap.get(thread).commit();
+            } else {
+                LOG.warn("No write transaction available for thread {}", thread);
+                return FluentFutures.immediateNullFluentFuture();
+            }
+        }
+        finally {
+            writeTrMap.remove(thread);
+            writeL.unlock();
+            LOG.debug("commit after unlock - {}", thread);
         }
-    }
-
-    private void releaseReadLock() {
-        LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
-        lock.readLock().unlock();
-    }
-
-    private ReadWriteTransaction resetRwTx() {
-        LOG.info("Resetting the read write transaction .....");
-        rwTx = dataBroker.newReadWriteTransaction();
-        return rwTx;
     }
 
     /**