BUG-5280: expose queue messages during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
index 5a34b3b77e2afd8419ae100929960e9b6ff02be8..3dc4dbf1469d989c05800bce57d3a175bd2e76cd 100644 (file)
@@ -16,8 +16,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
@@ -122,44 +120,35 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
      * involved, as the messages need to be replayed to the individual proxies.
      */
     @Override
-    @GuardedBy("connectionsLock")
-    protected final ConnectedClientConnection<ShardBackendInfo> connectionUp(
-            final AbstractClientConnection<ShardBackendInfo> conn, final ShardBackendInfo backend) {
-
-        // Step 0: create a new connected connection
-        final ConnectedClientConnection<ShardBackendInfo> newConn = new ConnectedClientConnection<>(conn.context(),
-                conn.cookie(), backend);
-
-        LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
-
+    protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+        // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
+        //         further TransactionProxies can be created and we can safely traverse maps without risking
+        //         missing an entry
         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
-        try {
-            // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
-            //         further TransactionProxies can be created and we can safely traverse maps without risking
-            //         missing an entry
-            startReconnect(singleHistory, newConn, cohorts);
-            for (ClientLocalHistory h : histories.values()) {
-                startReconnect(h, newConn, cohorts);
-            }
-
-            // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
-            //         the non-throttling interface to the connection, hence we use a wrapper consumer
-            for (HistoryReconnectCohort c : cohorts) {
-                c.replaySuccessfulRequests();
-            }
-
-            // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
-            //         requests will be immediately sent to it and requests being sent concurrently will get forwarded
-            //         once they hit the new connection.
-            conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
-        } finally {
-            // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
-            for (HistoryReconnectCohort c : cohorts) {
-                c.close();
-            }
+        startReconnect(singleHistory, newConn, cohorts);
+        for (ClientLocalHistory h : histories.values()) {
+            startReconnect(h, newConn, cohorts);
         }
 
-        return newConn;
+        return previousEntries -> {
+            try {
+                // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+                //         the non-throttling interface to the connection, hence we use a wrapper consumer
+                for (HistoryReconnectCohort c : cohorts) {
+                    c.replaySuccessfulRequests(previousEntries);
+                }
+
+                // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+                //         requests will be immediately sent to it and requests being sent concurrently will get
+                //         forwarded once they hit the new connection.
+                return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+            } finally {
+                // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+                for (HistoryReconnectCohort c : cohorts) {
+                    c.close();
+                }
+            }
+        };
     }
 
     private static void startReconnect(final AbstractClientHistory history,