BUG-5280: expose queue messages during reconnect
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
index ddb7bcdad112e84623c797f8790a968eac07791a..45580e92fd734489616c4840a5feac2f529aafd8 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
@@ -21,6 +22,7 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -36,6 +38,20 @@ import org.slf4j.LoggerFactory;
 @Beta
 public abstract class ClientActorBehavior<T extends BackendInfo> extends
         RecoveredClientActorBehavior<ClientActorContext> implements Identifiable<ClientIdentifier> {
+    /**
+     * Connection reconnect cohort, driven by this class.
+     */
+    @FunctionalInterface
+    protected interface ConnectionConnectCohort {
+        /**
+         * Finish the connection by replaying previous messages onto the new connection.
+         *
+         * @param enqueuedEntries Previously-enqueued entries
+         * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
+         */
+        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
 
     /**
@@ -185,28 +201,42 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
     }
 
     /**
-     * Callback invoked when a new connection has been established.
+     * Callback invoked when a new connection has been established. Implementations are expected perform preparatory
+     * tasks before the previous connection is frozen.
      *
-     * @param conn Old connection
-     * @param backend New backend
-     * @return Newly-connected connection.
+     * @param newConn New connection
+     * @return ConnectionConnectCohort which will be used to complete the process of bringing the connection up.
      */
     @GuardedBy("connectionsLock")
-    protected abstract @Nonnull ConnectedClientConnection<T> connectionUp(
-            final @Nonnull AbstractClientConnection<T> conn, final @Nonnull T backend);
+    @Nonnull protected abstract ConnectionConnectCohort connectionUp(@Nonnull ConnectedClientConnection<T> newConn);
 
     private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
             final T backend, final Throwable failure) {
         if (failure != null) {
             LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
+            conn.poison(new RuntimeRequestException("Failed to resolve shard " + shard, failure));
             return;
         }
 
         LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend);
         final long stamp = connectionsLock.writeLock();
         try {
-            // Bring the connection up
-            final ConnectedClientConnection<T> newConn = connectionUp(conn, backend);
+            // Create a new connected connection
+            final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
+                    conn.cookie(), backend);
+            LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+
+            // Start reconnecting without the old connection lock held
+            final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
+
+            // Lock the old connection and get a reference to its entries
+            final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+
+            // Finish the connection attempt
+            final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
+
+            // Install the forwarder, unlocking the old connection
+            conn.finishReplay(forwarder);
 
             // Make sure new lookups pick up the new connection
             connections.replace(shard, conn, newConn);