BUG-5280: fix race proxy creation and reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
index 1be84643350acdd197f5ab5dfe10dfcf558b146e..47283843d478401a2b72275e6ea87ca3347a0fcc 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
@@ -53,7 +54,10 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     @GuardedBy("this")
     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
 
+    @GuardedBy("lock")
     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
+    private final StampedLock lock = new StampedLock();
+
     private final AbstractDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
@@ -113,6 +117,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      *
      * @throws InversibleLockException if the shard is being reconnected
      */
+    @GuardedBy("lock")
     private ProxyHistory createHistoryProxy(final Long shard) {
         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
@@ -139,7 +144,14 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
         while (true) {
             try {
-                return histories.computeIfAbsent(shard, this::createHistoryProxy);
+                // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
+                // see comments in startReconnect() for details.
+                final long stamp = lock.readLock();
+                try {
+                    return histories.computeIfAbsent(shard, this::createHistoryProxy);
+                } finally {
+                    lock.unlockRead(stamp);
+                }
             } catch (InversibleLockException e) {
                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
                 e.awaitResolution();
@@ -248,7 +260,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     }
 
     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
-        final ProxyHistory oldProxy = histories.get(newConn.cookie());
+        /*
+         * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
+         *
+         * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
+         * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
+         * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
+         * the history map.
+         *
+         * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
+         * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
+         * requests are established to happen either before or after the reconnect attempt.
+         */
+        final ProxyHistory oldProxy;
+        final long stamp = lock.writeLock();
+        try {
+            oldProxy = histories.get(newConn.cookie());
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+
         if (oldProxy == null) {
             return null;
         }