BUG-8372: improve forward/replay naming 37/56637/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 5 May 2017 10:04:12 +0000 (12:04 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 6 May 2017 14:56:45 +0000 (14:56 +0000)
There is a bit of confusion between 'replay' and 'forward' methods.
They serve two distinct purposes:
- 'replay' happens during reconnect, i.e. for requests that have
           already entered the connection queue and have paid
           the delay cost, so they should not pay it again.
- 'forward' happens after reconnect for requests that have raced
            with the reconnect process, i.e. they need to hop from
            the old connection to the new one. These need to enter
            the queue and pay the delay cost.

This patch cleans the codepaths up to use consistent naming, making
it clearer that the problem we are seeing is in the 'replay' path.

Change-Id: Id854e09a0308f8d0a9144d59f41e31950cd58665
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit cc21df8ade11f41843dc558e8fc93d5be92ed151)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index f2e72f18ec25044c62bc6e018243e9f871f1bb44..dae12af731846a278386dcf75f8348e61eb30752 100644 (file)
@@ -306,8 +306,8 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
             }
 
             @Override
-            void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
-                proxy.replaySuccessfulRequests(previousEntries);
+            void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+                proxy.replayRequests(previousEntries);
             }
 
             @Override
index bc7eca75b84fb5241c53336e538502b2e209bc92..d9489924570f0eaa9494598e418f0580fd33fe24 100644 (file)
@@ -144,7 +144,7 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
                 // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
                 //         the non-throttling interface to the connection, hence we use a wrapper consumer
                 for (HistoryReconnectCohort c : cohorts) {
-                    c.replaySuccessfulRequests(previousEntries);
+                    c.replayRequests(previousEntries);
                 }
 
                 // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
index 86d7f237411f0068fc4649371d85245c993c8ddf..2fa98981be5bd3b2b58a324258a4eb741d0e5661 100644 (file)
@@ -543,7 +543,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param request Request to be forwarded
      * @param callback Original callback
      */
-    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+    final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
 
         if (successor instanceof LocalProxyTransaction) {
index 3fe6a09bf6d2a9f65864d2ca5b12823f69593c6c..f2734ae15debb7b402f78135f7333bad6dc8bd39 100644 (file)
@@ -78,7 +78,7 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
 
             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
             //        period required to get into the queue.
-            cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
+            cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
         } catch (RequestException e) {
             entry.complete(request.toRequestFailure(e));
         }
index 7e6ff671a443428e960742e7ae28d1cb61d7becc..f1939dc846ce74c1e5e69fdb4e03265f14b25904 100644 (file)
@@ -18,7 +18,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 abstract class HistoryReconnectCohort implements AutoCloseable {
     abstract ProxyReconnectCohort getProxy();
 
-    abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
 
     @Override
     public abstract void close();
index 88e86bc08985f00310444854b20a20e129bd1d50..827c19e526fcbd5f6b3cc1c33dfdc40698af11e9 100644 (file)
@@ -211,7 +211,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
             // First look for our Create message
             for (ConnectionEntry e : previousEntries) {
                 final Request<?, ?> req = e.getRequest();
@@ -257,18 +257,18 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
             if (request instanceof TransactionRequest) {
-                replayTransactionRequest((TransactionRequest<?>) request, callback);
+                forwardTransactionRequest((TransactionRequest<?>) request, callback);
             } else if (request instanceof LocalHistoryRequest) {
-                replayTo.accept(request, callback);
+                forwardTo.accept(request, callback);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void replayTransactionRequest(final TransactionRequest<?> request,
+        private void forwardTransactionRequest(final TransactionRequest<?> request,
                 final Consumer<Response<?, ?>> callback) throws RequestException {
 
             final AbstractProxyTransaction proxy;
@@ -282,7 +282,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 throw new RequestReplayException("Failed to find proxy for %s", request);
             }
 
-            proxy.replayRequest(request, callback);
+            proxy.forwardRequest(request, callback);
         }
     }
 
index 2f97f901ffad0029e8950a0c7fdb7febac3d20c3..0ad4dc53364d97b68ccf99ee02f7c21bd7e21fdd 100644 (file)
@@ -18,10 +18,10 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
 
-    abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
 
     abstract ProxyHistory finishReconnect();
 
-    abstract void replayRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
+    abstract void forwardRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
             BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
 }