BUG-5280: synchronize access to local histories 27/49427/5
authorRobert Varga <rovarga@cisco.com>
Thu, 15 Dec 2016 13:24:47 +0000 (14:24 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 17 Dec 2016 00:21:37 +0000 (00:21 +0000)
There is a subtle race between the allocation of histories
and the reconnect process, which could allow a local history
to be created when its connection is being reestablished but
after the cohorts for the connection have already been captured.

Change-Id: I230b5c00844d8e82775efc8f70368c2f63eabb1e
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java

index 202fe5b8fac5f9a1b9481bc2b7383220e06e77ca..9af88a8a5a7b9c8a3070ad5f24ae9a6ce06b3a20 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.StampedLock;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
@@ -62,6 +63,7 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
 
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
     private final AtomicLong nextHistoryId = new AtomicLong(1);
 
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
     private final AtomicLong nextHistoryId = new AtomicLong(1);
+    private final StampedLock lock = new StampedLock();
     private final SingleClientHistory singleHistory;
 
     private volatile Throwable aborted;
     private final SingleClientHistory singleHistory;
 
     private volatile Throwable aborted;
@@ -89,14 +91,19 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     }
 
     private void abortOperations(final Throwable cause) {
     }
 
     private void abortOperations(final Throwable cause) {
-        // This acts as a barrier, application threads check this after they have added an entry in the maps,
-        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
-        aborted = cause;
-
-        for (ClientLocalHistory h : histories.values()) {
-            h.localAbort(cause);
+        final long stamp = lock.writeLock();
+        try {
+            // This acts as a barrier, application threads check this after they have added an entry in the maps,
+            // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+            aborted = cause;
+
+            for (ClientLocalHistory h : histories.values()) {
+                h.localAbort(cause);
+            }
+            histories.clear();
+        } finally {
+            lock.unlockWrite(stamp);
         }
         }
-        histories.clear();
     }
 
     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
     }
 
     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
@@ -121,6 +128,8 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
      */
     @Override
     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
      */
     @Override
     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+        final long stamp = lock.writeLock();
+
         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
         //         further TransactionProxies can be created and we can safely traverse maps without risking
         //         missing an entry
         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
         //         further TransactionProxies can be created and we can safely traverse maps without risking
         //         missing an entry
@@ -143,9 +152,13 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
                 //         forwarded once they hit the new connection.
                 return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
             } finally {
                 //         forwarded once they hit the new connection.
                 return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
             } finally {
-                // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
-                for (HistoryReconnectCohort c : cohorts) {
-                    c.close();
+                try {
+                    // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+                    for (HistoryReconnectCohort c : cohorts) {
+                        c.close();
+                    }
+                } finally {
+                    lock.unlockWrite(stamp);
                 }
             }
         };
                 }
             }
         };
@@ -170,19 +183,21 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     public final ClientLocalHistory createLocalHistory() {
         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
             nextHistoryId.getAndIncrement());
     public final ClientLocalHistory createLocalHistory() {
         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
             nextHistoryId.getAndIncrement());
-        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
-        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
 
 
-        Verify.verify(histories.put(historyId, history) == null);
+        final long stamp = lock.readLock();
+        try {
+            if (aborted != null) {
+                throw Throwables.propagate(aborted);
+            }
+
+            final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+            LOG.debug("{}: creating a new local history {}", persistenceId(), history);
 
 
-        final Throwable a = aborted;
-        if (a != null) {
-            history.localAbort(a);
-            histories.remove(historyId, history);
-            throw Throwables.propagate(a);
+            Verify.verify(histories.put(historyId, history) == null);
+            return history;
+        } finally {
+            lock.unlockRead(stamp);
         }
         }
-
-        return history;
     }
 
     @Override
     }
 
     @Override