Reimplement RequestProcessor 83/104483/7
authorGilles Thouenon <gilles.thouenon@orange.com>
Wed, 22 Feb 2023 15:15:15 +0000 (16:15 +0100)
committerGilles Thouenon <gilles.thouenon@orange.com>
Sat, 25 Feb 2023 08:25:01 +0000 (09:25 +0100)
To fix datastore concurrency access between openroadm-topology update
and tapi-topology update threads.

JIRA: TRNSPRTPCE-727
Signed-off-by: Gilles Thouenon <gilles.thouenon@orange.com>
Change-Id: Ia71f89659da20549ecd3488f53e838623a8a96df

common/src/main/java/org/opendaylight/transportpce/common/network/RequestProcessor.java
networkmodel/src/main/java/org/opendaylight/transportpce/networkmodel/service/NetworkModelServiceImpl.java
tapi/src/main/java/org/opendaylight/transportpce/tapi/topology/TapiNetworkModelServiceImpl.java

index b58961c8e998d19e4e5f0ca50f6c41c8bff2c89e..940cd4f7ca5eac15baf6ae4e8ef8c24d2204412b 100644 (file)
@@ -11,122 +11,132 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
+    justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
 public class RequestProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
 
     private final DataBroker dataBroker;
-    private ReadWriteTransaction rwTx;
-    private ReentrantReadWriteLock lock;
-
+    private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
+    private final Lock readL = rwL.readLock();
+    private final Lock writeL = rwL.writeLock();
+    private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
 
 
     public RequestProcessor(DataBroker dataBroker) {
         this.dataBroker = requireNonNull(dataBroker);
-        rwTx = dataBroker.newReadWriteTransaction();
-        lock = new ReentrantReadWriteLock();
         LOG.info("RequestProcessor instantiated");
-
     }
 
     public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
             InstanceIdentifier<T> path) {
-
-        ListenableFuture<Optional<T>> result = null;
-        acquireReadLock();
-        LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
-        result = rwTx.read(store, path);
-
-        releaseReadLock();
-        return result;
+        ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
+        String thread = Thread.currentThread().getName();
+        readL.lock();
+        LOG.debug("read locked {} by {}", store, thread);
+        try {
+            return readTx.read(store, path);
+        }
+        finally {
+            readTx.close();
+            readL.unlock();
+            LOG.debug("read after unlock - {}", thread);
+        }
     }
 
     public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
-
-        acquireLock();
-        LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.delete(store, path);
+        String thread = Thread.currentThread().getName();
+        LOG.debug("delete - store, thread = {} - {}", store, thread);
+        writeL.lock();
+        LOG.debug("delete locked by {}", thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).delete(store, path);
+        }
+        finally {
+            LOG.debug("delete before unlock - {}", thread);
+            writeL.unlock();
+            LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
+            LOG.debug("delete after unlock2 - {}", thread);
+        }
     }
 
-
-    public <T extends DataObject> void put(LogicalDatastoreType store,
-        InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.put(store, path, data);
+    public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("put locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).put(store, path, data);
+        }
+        finally {
+            writeL.unlock();
+            LOG.debug("put after unlock - {}", thread);
+        }
     }
 
-
     public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
-
-        acquireLock();
-        LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
-        rwTx.merge(store, path, data);
-    }
-
-    public FluentFuture<? extends @NonNull CommitInfo> commit() {
-        acquireLock();
-        FluentFuture<? extends @NonNull CommitInfo> future = null;
-        future = rwTx.commit();
-        releaseLock();
-        resetRwTx();
-        return future;
-    }
-
-    public void close() {
-        releaseLock();
-    }
-
-    private void acquireLock() {
-        if (!lock.writeLock().isHeldByCurrentThread()) {
-            lock.writeLock().lock();
-            LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
-            LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
-            rwTx = resetRwTx();
-        } else {
-            LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("merge locked {} by {}", store, thread);
+        try {
+            if (!writeTrMap.containsKey(thread)) {
+                writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+            }
+            writeTrMap.get(thread).merge(store, path, data);
         }
-    }
-
-    private void acquireReadLock() {
-        if (lock.getReadHoldCount() > 0) {
-            LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
-        } else {
-            lock.readLock().lock();
-            rwTx = resetRwTx();
-            LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
+        finally {
+            writeL.unlock();
+            LOG.debug("merge after unlock - {}", thread);
         }
     }
 
-    private void releaseLock() {
-        if (lock.writeLock().isHeldByCurrentThread()) {
-            LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
-            lock.writeLock().unlock();
+    public FluentFuture<? extends @NonNull CommitInfo> commit() {
+        String thread = Thread.currentThread().getName();
+        writeL.lock();
+        LOG.debug("commit locked by {}", thread);
+        try {
+            if (writeTrMap.containsKey(thread)) {
+                return writeTrMap.get(thread).commit();
+            } else {
+                LOG.warn("No write transaction available for thread {}", thread);
+                return FluentFutures.immediateNullFluentFuture();
+            }
+        }
+        finally {
+            writeTrMap.remove(thread);
+            writeL.unlock();
+            LOG.debug("commit after unlock - {}", thread);
         }
     }
 
-    private void releaseReadLock() {
-        LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
-        lock.readLock().unlock();
-    }
-
-    private ReadWriteTransaction resetRwTx() {
-        LOG.info("Resetting the read write transaction .....");
-        rwTx = dataBroker.newReadWriteTransaction();
-        return rwTx;
+    public void close() {
+        LOG.info("closing RequestProcessor Locks by {}", Thread.currentThread().getName());
+        writeTrMap.remove(Thread.currentThread().getName());
+        readL.unlock();
+        writeL.unlock();
     }
 
     /**
index 3985fa2b1d2eaca23cb8eab1ec4f1db0669b9430..93dbc07f4d492e16a0378f901e3ad3c7512ceb9a 100644 (file)
@@ -787,7 +787,7 @@ public class NetworkModelServiceImpl implements NetworkModelService {
     private void deleteLinks(List<Link> links) {
         for (Link otnTopologyLink : links) {
             LOG.info("deleting link {} from {}", otnTopologyLink.getLinkId().getValue(),
-                NetworkUtils.OVERLAY_NETWORK_ID);
+                NetworkUtils.OTN_NETWORK_ID);
             InstanceIdentifier<Link> iiOtnTopologyLink = InstanceIdentifier.builder(Networks.class)
                 .child(Network.class, new NetworkKey(new NetworkId(NetworkUtils.OTN_NETWORK_ID)))
                 .augmentation(Network1.class)
index ff29646f79c34ccdb872d75131ae1b96c9faf3d6..268c77de3789e8128d4d99a851b875fc768e5d1e 100644 (file)
@@ -300,21 +300,18 @@ public class TapiNetworkModelServiceImpl implements TapiNetworkModelService {
                 LOG.error("Could not update TAPI links");
                 return;
             }
-            Map<LinkKey, Link> links = optTopology.get().getLink();
-            if (links != null) {
-                for (Link link : links.values()) {
-                    List<Uuid> linkNeps = Objects.requireNonNull(link.getNodeEdgePoint()).values().stream()
-                            .map(NodeEdgePointRef::getNodeEdgePointUuid).collect(Collectors.toList());
-                    if (!Collections.disjoint(changedOneps, linkNeps)) {
-                        InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(Context.class)
-                                .augmentation(Context1.class).child(TopologyContext.class)
-                                .child(Topology.class, new TopologyKey(tapiTopoUuid))
-                                .child(Link.class, new LinkKey(link.getUuid())).build();
-                        Link linkblr = new LinkBuilder().setUuid(link.getUuid())
-                                .setAdministrativeState(transformAdminState(mapping.getPortAdminState()))
-                                .setOperationalState(transformOperState(mapping.getPortOperState())).build();
-                        this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, linkIID, linkblr);
-                    }
+            for (Link link : optTopology.get().nonnullLink().values()) {
+                List<Uuid> linkNeps = Objects.requireNonNull(link.getNodeEdgePoint()).values().stream()
+                        .map(NodeEdgePointRef::getNodeEdgePointUuid).collect(Collectors.toList());
+                if (!Collections.disjoint(changedOneps, linkNeps)) {
+                    InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(Context.class)
+                            .augmentation(Context1.class).child(TopologyContext.class)
+                            .child(Topology.class, new TopologyKey(tapiTopoUuid))
+                            .child(Link.class, new LinkKey(link.getUuid())).build();
+                    Link linkblr = new LinkBuilder().setUuid(link.getUuid())
+                            .setAdministrativeState(transformAdminState(mapping.getPortAdminState()))
+                            .setOperationalState(transformOperState(mapping.getPortOperState())).build();
+                    this.networkTransactionService.merge(LogicalDatastoreType.OPERATIONAL, linkIID, linkblr);
                 }
             }
             this.networkTransactionService.commit().get();