X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Ftransportpce%2Fcommon%2Fnetwork%2FRequestProcessor.java;h=8a4780ca104bb107ab4ede0c0f88cb070e966322;hb=f87e6b2c57c7c994e223708ab5625310a35200fc;hp=54a88c5afed759ca94c8cb1e593973ab71bdda52;hpb=3cc9cc2152696a25177e965a40d63a5c0ab5f73f;p=transportpce.git diff --git a/common/src/main/java/org/opendaylight/transportpce/common/network/RequestProcessor.java b/common/src/main/java/org/opendaylight/transportpce/common/network/RequestProcessor.java index 54a88c5af..8a4780ca1 100644 --- a/common/src/main/java/org/opendaylight/transportpce/common/network/RequestProcessor.java +++ b/common/src/main/java/org/opendaylight/transportpce/common/network/RequestProcessor.java @@ -7,127 +7,137 @@ */ package org.opendaylight.transportpce.common.network; +import static java.util.Objects.requireNonNull; + import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.ReadTransaction; -import org.opendaylight.mdsal.binding.api.ReadWriteTransaction; +import org.opendaylight.mdsal.binding.api.WriteTransaction; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH", + justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock") public class RequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class); private final DataBroker dataBroker; - private ReadWriteTransaction rwTx; - private ReadTransaction readTx; - private ReentrantReadWriteLock lock; - + private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock(); + private final Lock readL = rwL.readLock(); + private final Lock writeL = rwL.writeLock(); + private Map writeTrMap = new HashMap<>(); public RequestProcessor(DataBroker dataBroker) { - this.dataBroker = dataBroker; - rwTx = dataBroker.newReadWriteTransaction(); - readTx = dataBroker.newReadOnlyTransaction(); - lock = new ReentrantReadWriteLock(); + this.dataBroker = requireNonNull(dataBroker); LOG.info("RequestProcessor instantiated"); - } - public ListenableFuture> - read(LogicalDatastoreType store,InstanceIdentifier path) { - - ListenableFuture> result = null; - acquireReadLock(); - LOG.debug("Number of threads in queue to read {}", lock.getQueueLength()); - result = rwTx.read(store, path); - - releaseReadLock(); - return result; + public ListenableFuture> read(LogicalDatastoreType store, + InstanceIdentifier path) { + ReadTransaction readTx = dataBroker.newReadOnlyTransaction(); + String thread = Thread.currentThread().getName(); + readL.lock(); + LOG.debug("read locked {} by {}", store, thread); + try { + return readTx.read(store, path); + } + finally { + readTx.close(); + readL.unlock(); + LOG.debug("read after unlock - {}", thread); + } } public void delete(LogicalDatastoreType store, InstanceIdentifier path) { - - acquireLock(); - LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength()); - rwTx.delete(store, path); - } - - - public void put(LogicalDatastoreType store, - InstanceIdentifier path, T data) { - - acquireLock(); - LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength()); - rwTx.put(store, path, data); - } - - - public void merge(LogicalDatastoreType store, - InstanceIdentifier path, T data) { - - acquireLock(); - LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength()); - rwTx.merge(store, path, data); - } - - public FluentFuture commit() { - acquireLock(); - FluentFuture future = null; - future = rwTx.commit(); - releaseLock(); - resetRwTx(); - return future; - } - - public void close() { - releaseLock(); - } - - private void acquireLock() { - if (!lock.writeLock().isHeldByCurrentThread()) { - lock.writeLock().lock(); - LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength()); - LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName()); - rwTx = resetRwTx(); - } else { - LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName()); + String thread = Thread.currentThread().getName(); + LOG.debug("delete - store, thread = {} - {}", store, thread); + writeL.lock(); + LOG.debug("delete locked by {}", thread); + try { + if (!writeTrMap.containsKey(thread)) { + writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction()); + } + writeTrMap.get(thread).delete(store, path); + } + finally { + LOG.debug("delete before unlock - {}", thread); + writeL.unlock(); + LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName()); + LOG.debug("delete after unlock2 - {}", thread); } } - private void acquireReadLock() { - if (lock.getReadHoldCount() > 0) { - LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName()); - } else { - lock.readLock().lock(); - rwTx = resetRwTx(); - LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName()); + public void put(LogicalDatastoreType store, InstanceIdentifier path, T data) { + String thread = Thread.currentThread().getName(); + writeL.lock(); + LOG.debug("put locked {} by {}", store, thread); + try { + if (!writeTrMap.containsKey(thread)) { + writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction()); + } + writeTrMap.get(thread).put(store, path, data); + } + finally { + writeL.unlock(); + LOG.debug("put after unlock - {}", thread); } } - private void releaseLock() { - if (lock.writeLock().isHeldByCurrentThread()) { - LOG.info("Write Lock released by : {}", Thread.currentThread().getName()); - lock.writeLock().unlock(); + public void merge(LogicalDatastoreType store, InstanceIdentifier path, T data) { + String thread = Thread.currentThread().getName(); + writeL.lock(); + LOG.debug("merge locked {} by {}", store, thread); + try { + if (!writeTrMap.containsKey(thread)) { + writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction()); + } + writeTrMap.get(thread).merge(store, path, data); + } + finally { + writeL.unlock(); + LOG.debug("merge after unlock - {}", thread); } } - private void releaseReadLock() { - LOG.info("Read Lock released by : {}", Thread.currentThread().getName()); - lock.readLock().unlock(); + public FluentFuture commit() { + String thread = Thread.currentThread().getName(); + writeL.lock(); + LOG.debug("commit locked by {}", thread); + try { + if (writeTrMap.containsKey(thread)) { + return writeTrMap.get(thread).commit(); + } else { + LOG.warn("No write transaction available for thread {}", thread); + return FluentFutures.immediateNullFluentFuture(); + } + } + finally { + writeTrMap.remove(thread); + writeL.unlock(); + LOG.debug("commit after unlock - {}", thread); + } } - private ReadWriteTransaction resetRwTx() { - LOG.info("Resetting the read write transaction ....."); - rwTx = dataBroker.newReadWriteTransaction(); - return rwTx; + /** + * Return the dataBroker related to RequestProcessor. + * @return the dataBroker + */ + public DataBroker getDataBroker() { + return dataBroker; } }