import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
+ justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
public class RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
private final DataBroker dataBroker;
- private ReadWriteTransaction rwTx;
- private ReentrantReadWriteLock lock;
-
+ private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
+ private final Lock readL = rwL.readLock();
+ private final Lock writeL = rwL.writeLock();
+ private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
public RequestProcessor(DataBroker dataBroker) {
this.dataBroker = requireNonNull(dataBroker);
- rwTx = dataBroker.newReadWriteTransaction();
- lock = new ReentrantReadWriteLock();
LOG.info("RequestProcessor instantiated");
-
}
public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
InstanceIdentifier<T> path) {
-
- ListenableFuture<Optional<T>> result = null;
- acquireReadLock();
- LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
- result = rwTx.read(store, path);
-
- releaseReadLock();
- return result;
+ ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
+ String thread = Thread.currentThread().getName();
+ readL.lock();
+ LOG.debug("read locked {} by {}", store, thread);
+ try {
+ return readTx.read(store, path);
+ }
+ finally {
+ readTx.close();
+ readL.unlock();
+ LOG.debug("read after unlock - {}", thread);
+ }
}
public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
-
- acquireLock();
- LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
- rwTx.delete(store, path);
+ String thread = Thread.currentThread().getName();
+ LOG.debug("delete - store, thread = {} - {}", store, thread);
+ writeL.lock();
+ LOG.debug("delete locked by {}", thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).delete(store, path);
+ }
+ finally {
+ LOG.debug("delete before unlock - {}", thread);
+ writeL.unlock();
+ LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
+ LOG.debug("delete after unlock2 - {}", thread);
+ }
}
-
- public <T extends DataObject> void put(LogicalDatastoreType store,
- InstanceIdentifier<T> path, T data) {
-
- acquireLock();
- LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
- rwTx.put(store, path, data);
+ public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("put locked {} by {}", store, thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).put(store, path, data);
+ }
+ finally {
+ writeL.unlock();
+ LOG.debug("put after unlock - {}", thread);
+ }
}
-
public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
-
- acquireLock();
- LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
- rwTx.merge(store, path, data);
- }
-
- public FluentFuture<? extends @NonNull CommitInfo> commit() {
- acquireLock();
- FluentFuture<? extends @NonNull CommitInfo> future = null;
- future = rwTx.commit();
- releaseLock();
- resetRwTx();
- return future;
- }
-
- public void close() {
- releaseLock();
- }
-
- private void acquireLock() {
- if (!lock.writeLock().isHeldByCurrentThread()) {
- lock.writeLock().lock();
- LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
- LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
- rwTx = resetRwTx();
- } else {
- LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("merge locked {} by {}", store, thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).merge(store, path, data);
}
- }
-
- private void acquireReadLock() {
- if (lock.getReadHoldCount() > 0) {
- LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
- } else {
- lock.readLock().lock();
- rwTx = resetRwTx();
- LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
+ finally {
+ writeL.unlock();
+ LOG.debug("merge after unlock - {}", thread);
}
}
- private void releaseLock() {
- if (lock.writeLock().isHeldByCurrentThread()) {
- LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
- lock.writeLock().unlock();
+ public FluentFuture<? extends @NonNull CommitInfo> commit() {
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("commit locked by {}", thread);
+ try {
+ if (writeTrMap.containsKey(thread)) {
+ return writeTrMap.get(thread).commit();
+ } else {
+ LOG.warn("No write transaction available for thread {}", thread);
+ return FluentFutures.immediateNullFluentFuture();
+ }
+ }
+ finally {
+ writeTrMap.remove(thread);
+ writeL.unlock();
+ LOG.debug("commit after unlock - {}", thread);
}
}
- private void releaseReadLock() {
- LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
- lock.readLock().unlock();
- }
-
- private ReadWriteTransaction resetRwTx() {
- LOG.info("Resetting the read write transaction .....");
- rwTx = dataBroker.newReadWriteTransaction();
- return rwTx;
+ public void close() {
+ LOG.info("closing RequestProcessor Locks by {}", Thread.currentThread().getName());
+ writeTrMap.remove(Thread.currentThread().getName());
+ readL.unlock();
+ writeL.unlock();
}
/**
LOG.error("Could not update TAPI links");
return;
}
- Map<LinkKey, Link> links = optTopology.get().getLink();
- if (links != null) {
- for (Link link : links.values()) {
- List<Uuid> linkNeps = Objects.requireNonNull(link.getNodeEdgePoint()).values().stream()
- .map(NodeEdgePointRef::getNodeEdgePointUuid).collect(Collectors.toList());
- if (!Collections.disjoint(changedOneps, linkNeps)) {
- InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(Context.class)
- .augmentation(Context1.class).child(TopologyContext.class)
- .child(Topology.class, new TopologyKey(tapiTopoUuid))
- .child(Link.class, new LinkKey(link.getUuid())).build();
- Link linkblr = new LinkBuilder().setUuid(link.getUuid())
- .setAdministrativeState(transformAdminState(mapping.getPortAdminState()))
- .setOperationalState(transformOperState(mapping.getPortOperState())).build();
- this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, linkIID, linkblr);
- }
+ for (Link link : optTopology.get().nonnullLink().values()) {
+ List<Uuid> linkNeps = Objects.requireNonNull(link.getNodeEdgePoint()).values().stream()
+ .map(NodeEdgePointRef::getNodeEdgePointUuid).collect(Collectors.toList());
+ if (!Collections.disjoint(changedOneps, linkNeps)) {
+ InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(Context.class)
+ .augmentation(Context1.class).child(TopologyContext.class)
+ .child(Topology.class, new TopologyKey(tapiTopoUuid))
+ .child(Link.class, new LinkKey(link.getUuid())).build();
+ Link linkblr = new LinkBuilder().setUuid(link.getUuid())
+ .setAdministrativeState(transformAdminState(mapping.getPortAdminState()))
+ .setOperationalState(transformOperState(mapping.getPortOperState())).build();
+ this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, linkIID, linkblr);
}
}
this.networkTransactionService.commit().get();