BUG-8422: Propagate enqueue time 51/56951/4
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 9 May 2017 21:55:31 +0000 (23:55 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 12 May 2017 21:35:18 +0000 (21:35 +0000)
When we are replaying requests onto a connection we really want
to leave their enqueue times intact, so they time out properly.
This codepath is specific for the replay case, hence we do not
want to incur any waiting, either.

This patch introduces enqueueRequest() which does not wait for
the queue duration and audits code paths so they end up talking
to the right method -- either enqueueRequest() or sendRequest().

Change-Id: Ibf97dcc11e32d9ffa911c78ccf0448d6891a9cac
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 17e4759c7561e09786a22210e43b5b32db45149e)

15 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java

index 2423473472301b8b50a77b4758cc3b7eee138f03..d37893bd7c3a9862ad89fb9d88b94337d0355f13 100644 (file)
@@ -87,6 +87,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return context.self();
     }
 
+    public final long currentTime() {
+        return context.ticker().read();
+    }
+
     /**
      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
      * from any thread.
@@ -98,13 +102,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * @param callback Callback to invoke
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        final RequestException maybePoison = poisoned;
-        if (maybePoison != null) {
-            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+        final long now = currentTime();
+        final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+        try {
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
         }
+    }
 
-        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-        enqueueAndWait(entry, entry.getEnqueuedTicks());
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * <p>
+     * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
+     * should never be called from an application thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
+     */
+    public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
     public abstract Optional<T> getBackendInfo();
@@ -122,19 +144,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, readTime());
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
 
-    private long readTime() {
-        return context.ticker().read();
-    }
-
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
+            final RequestException maybePoison = poisoned;
+            if (maybePoison != null) {
+                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+            }
+
             if (queue.isEmpty()) {
                 // The queue is becoming non-empty, schedule a timer
                 scheduleTimer(REQUEST_TIMEOUT_DURATION);
@@ -145,15 +168,6 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    final void enqueueAndWait(final ConnectionEntry entry, final long now) {
-        final long delay = enqueueEntry(entry, now);
-        try {
-            TimeUnit.NANOSECONDS.sleep(delay);
-        } catch (InterruptedException e) {
-            LOG.debug("Interrupted while sleeping", e);
-        }
-    }
-
     final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
         lock.lock();
         try {
@@ -197,7 +211,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         lock.lock();
         try {
             haveTimer = false;
-            final long now = readTime();
+            final long now = currentTime();
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             final long ticksSinceProgress = queue.ticksStalling(now);
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
@@ -284,7 +298,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
-        final long now = readTime();
+        final long now = currentTime();
 
         final Optional<TransmittedConnectionEntry> maybeEntry;
         lock.lock();
index 547537764ac7507322399e89223e8ef7144e81d6..8d769b273814e735691cd66206de5644549fd433 100644 (file)
@@ -50,7 +50,7 @@ public class ConnectionEntry implements Immutable {
         callback.accept(response);
     }
 
-    final long getEnqueuedTicks() {
+    public final long getEnqueuedTicks() {
         return enqueuedTicks;
     }
 
@@ -60,6 +60,6 @@ public class ConnectionEntry implements Immutable {
     }
 
     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return toStringHelper.add("request", request);
+        return toStringHelper.add("request", request).add("enqueuedTicks", enqueuedTicks);
     }
 }
index d384ba47369ce5dc034028d12cad11bb54f8e3ae..9ab80d0d0085df1ef612e48606079b4eae413ea0 100644 (file)
@@ -159,6 +159,9 @@ abstract class TransmitQueue {
             return 0;
         }
 
+        // XXX: we should place a guard against incorrect entry sequences:
+        // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
+
         // Reserve an entry before we do anything that can fail
         final long delay = tracker.openTask(now);
         if (canTransmitCount(inflight.size()) <= 0) {
index 83ba07b69ac8f6e3ed253d18e420315065069e4d..46af74a5630999e0c75f430955875754c274dcb5 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -229,6 +230,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return doRead(path);
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback);
+        parent.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
         parent.sendRequest(request, callback);
@@ -324,10 +331,15 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            purge();
+            sendPurge();
         });
     }
 
+    final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
+            enqueuedTicks);
+    }
+
     final void sendAbort(final Consumer<Response<?, ?>> callback) {
         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
     }
@@ -357,7 +369,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    purge();
+                    sendPurge();
                 });
 
                 return ret;
@@ -451,11 +463,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
-            purge();
+            sendPurge();
         });
     }
 
-    void purge() {
+    final void sendPurge() {
         successfulRequests.clear();
 
         final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
@@ -465,6 +477,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
+    final void enqueuePurge(final long enqueuedTicks) {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        enqueueRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
+            parent.completeTransaction(this);
+        }, enqueuedTicks);
+    }
+
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
@@ -489,17 +511,26 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         local.setSuccessor(successor);
 
         // Replay successful requests first
-        for (Object obj : successfulRequests) {
-            if (obj instanceof TransactionRequest) {
-                LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                successor.replay((TransactionRequest<?>) obj, response -> { });
-            } else {
-                Verify.verify(obj instanceof IncrementSequence);
-                successor.incrementSequence(((IncrementSequence) obj).getDelta());
+        if (!successfulRequests.isEmpty()) {
+            // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+            // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+            // time. We will pick the time of the first entry available. If there is none, we will just use current
+            // time, as all other requests will get enqueued afterwards.
+            final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+            final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+            for (Object obj : successfulRequests) {
+                if (obj instanceof TransactionRequest) {
+                    LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+                    successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+                } else {
+                    Verify.verify(obj instanceof IncrementSequence);
+                    successor.incrementSequence(((IncrementSequence) obj).getDelta());
+                }
             }
+            LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+            successfulRequests.clear();
         }
-        LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
-        successfulRequests.clear();
 
         // Now replay whatever is in the connection
         final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
@@ -509,8 +540,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
-                LOG.debug("Forwarding queued request {} to successor {}", req, successor);
-                successor.replay((TransactionRequest<?>) req, e.getCallback());
+                LOG.debug("Replaying queued request {} to successor {}", req, successor);
+                successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
                 it.remove();
             }
         }
@@ -537,12 +568,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks ticker-based time stamp when the request was enqueued
      */
-    private void replay(TransactionRequest<?> request, Consumer<Response<?, ?>> callback) {
+    private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
         if (request instanceof AbstractLocalTransactionRequest) {
-            handleForwardedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback);
+            handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
         } else {
-            handleForwardedRemoteRequest(request, callback);
+            handleReplayedRemoteRequest(request, callback, enqueuedTicks);
         }
     }
 
@@ -563,8 +596,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Original callback
      */
     final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
-        final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
+        forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback);
+    }
 
+    final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
         if (successor instanceof LocalProxyTransaction) {
             forwardToLocal((LocalProxyTransaction)successor, request, callback);
         } else if (successor instanceof RemoteProxyTransaction) {
@@ -615,9 +651,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks Time stamp to use for enqueue time
      */
-    abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest<?> request,
-            @Nullable Consumer<Response<?, ?>> callback);
+    abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     /**
      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
@@ -627,9 +664,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks Time stamp to use for enqueue time
      */
-    abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
-            @Nullable Consumer<Response<?, ?>> callback);
+    abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     @Override
     public final String toString() {
index f2734ae15debb7b402f78135f7333bad6dc8bd39..a518c551694cfe0f1a1d3827b5a8555d81d5bad3 100644 (file)
@@ -55,7 +55,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
     }
 
-
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
         final Request<? , ?> request = entry.getRequest();
@@ -76,8 +75,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
                 throw new CohortNotFoundException(historyId);
             }
 
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
             cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
         } catch (RequestException e) {
             entry.complete(request.toRequestFailure(e));
index 7facc5160a1b573028f6b7d076a7f7d67cbe08aa..b22829370336e26f0da06c9c344ba1fba9c8add0 100644 (file)
@@ -72,6 +72,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
 
+    abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
     @Override
     final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
@@ -90,30 +93,61 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
         if (request instanceof AbortLocalTransactionRequest) {
-            sendAbort(request, callback);
+            enqueueAbort(request, callback, enqueuedTicks);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
     }
 
-    @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+    private boolean handleReadRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
-        if (request instanceof ModifyTransactionRequest) {
-            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
-        } else if (request instanceof ReadTransactionRequest) {
+        if (request instanceof ReadTransactionRequest) {
             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
             final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
             callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            return true;
         } else if (request instanceof ExistsTransactionRequest) {
             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
             final boolean result = readOnlyView().readNode(path).isPresent();
             callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        if (request instanceof ModifyTransactionRequest) {
+            replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
+        } else if (handleReadRequest(request, callback)) {
+            // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            purge();
+            enqueuePurge(enqueuedTicks);
+        } else {
+            throw new IllegalArgumentException("Unhandled request " + request);
+        }
+    }
+
+    /**
+     * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)},
+     * except it is invoked in the forwarding path from
+     * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}.
+     *
+     * @param request Forwarded request
+     * @param callback Callback to be invoked once the request completes
+     */
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        if (request instanceof ModifyTransactionRequest) {
+            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else if (handleReadRequest(request, callback)) {
+            // No-op
+        } else if (request instanceof TransactionPurgeRequest) {
+            sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -153,7 +187,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.purge();
+            successor.sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -165,7 +199,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.purge();
+            successor.sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -176,4 +210,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         sendRequest(request, callback);
     }
+
+    void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        enqueueRequest(request, callback, enqueuedTicks);
+    }
 }
index 0f2d00771677f8bab1a01315f820ad046ad0fcb2..b600a66ea6089c1473d28744481107d1090d2262 100644 (file)
@@ -79,10 +79,23 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     @Override
     void applyModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback) {
+        commonModifyTransactionRequest(request, callback);
+        abort();
+    }
+
+    @Override
+    void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        commonModifyTransactionRequest(request, callback);
+        // FIXME: this should go through the enqueueRequest() path
+        abort();
+    }
+
+    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+            final Consumer<Response<?, ?>> callback) {
         Verify.verify(request.getModifications().isEmpty());
 
         final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
         Verify.verify(protocol == PersistenceProtocol.ABORT);
-        abort();
     }
 }
index 720ada3191e7fd5d06892e0583dcabd55097ffa6..424a9ea0db220752c979dbdae89672bfcb505790 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
@@ -203,6 +205,18 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     @Override
     void applyModifyTransactionRequest(final ModifyTransactionRequest request,
             final @Nullable Consumer<Response<?, ?>> callback) {
+        commonModifyTransactionRequest(request, callback, this::sendRequest);
+    }
+
+    @Override
+    void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
+    }
+
+    private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback,
+            final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
         for (final TransactionModification mod : request.getModifications()) {
             if (mod instanceof TransactionWrite) {
                 write(mod.getPath(), ((TransactionWrite)mod).getData());
@@ -215,23 +229,23 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             }
         }
 
-        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+        final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
             Verify.verify(callback != null, "Request {} has null callback", request);
             ensureSealed();
 
             switch (maybeProtocol.get()) {
                 case ABORT:
-                    sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
+                    sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
                     break;
                 case READY:
                     // No-op, as we have already issued a seal()
                     break;
                 case SIMPLE:
-                    sendRequest(commitRequest(false), callback);
+                    sendMethod.accept(commitRequest(false), callback);
                     break;
                 case THREE_PHASE:
-                    sendRequest(commitRequest(true), callback);
+                    sendMethod.accept(commitRequest(true), callback);
                     break;
                 default:
                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
@@ -240,18 +254,35 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long now) {
         if (request instanceof CommitLocalTransactionRequest) {
             sendCommit((CommitLocalTransactionRequest) request, callback);
         } else {
-            super.handleForwardedLocalRequest(request, callback);
+            super.handleReplayedLocalRequest(request, callback, now);
         }
     }
 
     @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
-            final @Nullable Consumer<Response<?, ?>> callback) {
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        LOG.debug("Applying replayed request {}", request);
+
+        if (request instanceof TransactionPreCommitRequest) {
+            enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionAbortRequest) {
+            enqueueAbort(callback, enqueuedTicks);
+        } else {
+            super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
+        }
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         LOG.debug("Applying forwarded request {}", request);
 
         if (request instanceof TransactionPreCommitRequest) {
@@ -283,6 +314,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         closedException = this::abortedException;
     }
 
+    @Override
+    void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        super.enqueueAbort(request, callback, enqueuedTicks);
+        closedException = this::abortedException;
+    }
+
     private CursorAwareDataTreeModification getModification() {
         if (closedException != null) {
             throw closedException.get();
index 2a21b8e858c9548ccf5ef82206042b83f38cb4e5..8c3b485134c6f801f35f475c48cd33b1f5da3a6d 100644 (file)
@@ -327,6 +327,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final long currentTime() {
+        return connection.currentTime();
+    }
+
     final ActorRef localActor() {
         return connection.localActor();
     }
@@ -391,6 +395,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        connection.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
index 192205dc0a9c0f1df53b8177484f8bf3c0a2282f..09a8a605631c1f9d4b5cbe07e95a42a13077e85a 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -97,17 +96,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doDelete(final YangInstanceIdentifier path) {
-        appendModification(new TransactionDelete(path));
+        appendModification(new TransactionDelete(path), Optional.absent());
     }
 
     @Override
     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        appendModification(new TransactionMerge(path, data));
+        appendModification(new TransactionMerge(path, data), Optional.absent());
     }
 
     @Override
     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        appendModification(new TransactionWrite(path, data));
+        appendModification(new TransactionWrite(path, data), Optional.absent());
     }
 
     private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
@@ -158,83 +157,42 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
+    private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
+        if (builderBusy) {
+            flushBuilder(enqueuedTicks);
+        }
+    }
+
     private void flushBuilder() {
+        flushBuilder(Optional.absent());
+    }
+
+    private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
 
-        sendModification(request);
+        sendModification(request, enqueuedTicks);
     }
 
-    private void sendModification(final TransactionRequest<?> request) {
-        sendRequest(request, response -> completeModify(request, response));
-    }
-
-    @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
-        if (request instanceof CommitLocalTransactionRequest) {
-            replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback);
-        } else if (request instanceof AbortLocalTransactionRequest) {
-            sendRequest(abortRequest(), callback);
+    private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
+        if (enqueuedTicks.isPresent()) {
+            enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
         } else {
-            throw new IllegalStateException("Unhandled request " + request);
+            sendRequest(request, response -> completeModify(request, response));
         }
     }
 
-    private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
-            final Consumer<Response<?, ?>> callback) {
-        final DataTreeModification mod = request.getModification();
-        mod.applyToCursor(new AbstractDataTreeModificationCursor() {
-            @Override
-            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-                doWrite(current().node(child), data);
-            }
-
-            @Override
-            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-                doMerge(current().node(child), data);
-            }
-
-            @Override
-            public void delete(final PathArgument child) {
-                doDelete(current().node(child));
-            }
-        });
-
-        sendRequest(commitRequest(request.isCoordinated()), callback);
-    }
-
-    @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
-            final @Nullable Consumer<Response<?, ?>> callback) {
-        nextSequence();
-
-        if (callback == null) {
-            sendModification(request);
-            return;
-        }
-
-        /*
-         * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null
-         * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below.
-         */
-        final Consumer<Response<?, ?>> findBugsIsStupid = callback;
-
-        // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-        //        period required to get into the queue.
-        sendRequest(request, response -> {
-            findBugsIsStupid.accept(Preconditions.checkNotNull(response));
-            completeModify(request, response);
-        });
+    private void appendModification(final TransactionModification modification) {
+        appendModification(modification, Optional.absent());
     }
 
-    private void appendModification(final TransactionModification modification) {
+    private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
         if (operationFailure == null) {
             ensureInitializedBuilder();
 
             builder.addModification(modification);
             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
-                flushBuilder();
+                flushBuilder(enqueuedTicks);
             }
         } else {
             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
@@ -329,7 +287,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         if (builderBusy) {
             final ModifyTransactionRequest request = builder.build();
             builderBusy = false;
-            successor.handleForwardedRemoteRequest(request, null);
+            forwardToSuccessor(successor, request, null);
         }
     }
 
@@ -349,15 +307,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             if (maybeProto.isPresent()) {
                 ensureSealed();
 
+                final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
                     case ABORT:
-                        sendRequest(abortRequest(), callback);
+                        tmp = abortRequest();
+                        sendRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            callback.accept(resp);
+                        });
                         break;
                     case SIMPLE:
-                        sendRequest(commitRequest(false), callback);
+                        tmp = commitRequest(false);
+                        sendRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            callback.accept(resp);
+                        });
                         break;
                     case THREE_PHASE:
-                        sendRequest(commitRequest(true), callback);
+                        tmp = commitRequest(true);
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     case READY:
                         //no op
@@ -369,14 +340,25 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         } else if (request instanceof ReadTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    callback.accept(resp);
+                });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    callback.accept(resp);
+                });
         } else if (request instanceof TransactionPreCommitRequest) {
             ensureFlushedBuider();
-            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+            final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+                localActor());
+            sendRequest(tmp, resp -> {
+                recordSuccessfulRequest(tmp);
+                callback.accept(resp);
+            });
         } else if (request instanceof TransactionDoCommitRequest) {
             ensureFlushedBuider();
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
@@ -384,7 +366,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            purge();
+            sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -395,4 +377,123 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             final Consumer<Response<?, ?>> callback) {
         successor.handleForwardedRemoteRequest(request, callback);
     }
+
+    @Override
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        if (request instanceof CommitLocalTransactionRequest) {
+            replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            enqueueRequest(abortRequest(), callback, enqueuedTicks);
+        } else {
+            throw new IllegalStateException("Unhandled request " + request);
+        }
+    }
+
+    private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        final DataTreeModification mod = request.getModification();
+        final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+        mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+            @Override
+            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                appendModification(new TransactionWrite(current().node(child), data), optTicks);
+            }
+
+            @Override
+            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                appendModification(new TransactionMerge(current().node(child), data), optTicks);
+            }
+
+            @Override
+            public void delete(final PathArgument child) {
+                appendModification(new TransactionDelete(current().node(child)), optTicks);
+            }
+        });
+
+        enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
+    }
+
+    @Override
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+        final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+        if (request instanceof ModifyTransactionRequest) {
+            final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+            for (TransactionModification mod : req.getModifications()) {
+                appendModification(mod, optTicks);
+            }
+
+            final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+            if (maybeProto.isPresent()) {
+                ensureSealed();
+
+                final TransactionRequest<?> tmp;
+                switch (maybeProto.get()) {
+                    case ABORT:
+                        tmp = abortRequest();
+                        enqueueRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case SIMPLE:
+                        tmp = commitRequest(false);
+                        enqueueRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case THREE_PHASE:
+                        tmp = commitRequest(true);
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case READY:
+                        //no op
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+                }
+            }
+        } else if (request instanceof ReadTransactionRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    cb.accept(resp);
+                }, enqueuedTicks);
+        } else if (request instanceof ExistsTransactionRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    cb.accept(resp);
+                }, enqueuedTicks);
+        } else if (request instanceof TransactionPreCommitRequest) {
+            ensureFlushedBuider(optTicks);
+            final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+                localActor());
+            enqueueRequest(tmp, resp -> {
+                recordSuccessfulRequest(tmp);
+                cb.accept(resp);
+            }, enqueuedTicks);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionAbortRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueAbort(callback, enqueuedTicks);
+        } else if (request instanceof TransactionPurgeRequest) {
+            enqueuePurge(enqueuedTicks);
+        } else {
+            throw new IllegalArgumentException("Unhandled request {}" + request);
+        }
+    }
 }
index e02701d2600a095bd7a2124f5e1d6b541256c739..9db19077e776e8817c77a631b39c49eb25a62a16 100644 (file)
@@ -17,6 +17,7 @@ import static org.mockito.Mockito.when;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.primitives.UnsignedLong;
 import java.util.ArrayList;
 import java.util.List;
@@ -169,16 +170,16 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         final List<ConnectionEntry> entries = new ArrayList<>();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
         final ReadTransactionRequest request1 =
-                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+                new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
         final ExistsTransactionRequest request2 =
-                new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+                new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
         transaction.recordSuccessfulRequest(successful1);
         final ReadTransactionRequest successful2 =
-                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+                new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
         transaction.recordSuccessfulRequest(successful2);
         transaction.startReconnect();
         transaction.replayMessages(successor.getTransaction(), entries);
@@ -188,9 +189,27 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
         Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
         Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
-        Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
-        Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
-        Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+
+        ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+        Assert.assertNotNull(tmpRead);
+        Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
+        Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
+        Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
+        Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+        tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+        Assert.assertNotNull(tmpRead);
+        Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
+        Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
+        Assert.assertEquals(request1.getPath(), tmpRead.getPath());
+        Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+        final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
+        Assert.assertNotNull(tmpExist);
+        Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
+        Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
+        Assert.assertEquals(request2.getPath(), tmpExist.getPath());
+        Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
     }
 
     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
@@ -215,7 +234,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
     }
 
     protected <T extends TransactionRequest<?>> T testHandleForwardedRemoteRequest(final T request) throws Exception {
-        transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+        transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
         final T received = (T) envelope.getMessage();
         Assert.assertTrue(received.getClass().equals(request.getClass()));
index 7815a262084de004fbff00f1117c6614f28ce16c..dccdf9ef704a4dc6cfde283bf3592e78476c3251 100644 (file)
@@ -14,6 +14,7 @@ import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import java.util.function.Consumer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,7 +64,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final ReadTransactionRequest request =
                 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.handleForwardedRemoteRequest(request, callback);
+        transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());
         final Response<?, ?> value = captor.getValue();
@@ -79,7 +80,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final ExistsTransactionRequest request =
                 new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.handleForwardedRemoteRequest(request, callback);
+        transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());
         final Response<?, ?> value = captor.getValue();
index 68e106a3ff980dcb763dbad53f047e008258cb00..c3dfebd6be3832b8d77b62692e1b14559c07d202 100644 (file)
@@ -11,6 +11,7 @@ import static org.mockito.Mockito.when;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.base.VerifyException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -114,7 +115,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest
         builder.setSequence(0);
         builder.setAbort();
         final ModifyTransactionRequest request = builder.build();
-        transaction.applyModifyTransactionRequest(request, createCallbackMock());
+        transaction.replayModifyTransactionRequest(request, createCallbackMock(), Ticker.systemTicker().read());
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 
@@ -126,8 +127,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest
         builder.setSequence(0);
         builder.setReady();
         final ModifyTransactionRequest request = builder.build();
-        assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()),
-                VerifyException.class);
+        assertOperationThrowsException(() -> transaction.replayModifyTransactionRequest(request, createCallbackMock(),
+            Ticker.systemTicker().read()), VerifyException.class);
     }
-
 }
\ No newline at end of file
index 390302b539f116bf1a3a6a0eb8716a27323654f3..7eebfe02cad234c588aa2b3e000cc8f10bf3de34 100644 (file)
@@ -16,6 +16,7 @@ import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtil
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.function.Consumer;
 import org.junit.Assert;
@@ -176,7 +177,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         builder.setAbort();
         final ModifyTransactionRequest request = builder.build();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.applyModifyTransactionRequest(request, callback);
+        transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 
@@ -235,7 +236,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         builder.setCommit(coordinated);
         final ModifyTransactionRequest request = builder.build();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.applyModifyTransactionRequest(request, callback);
+        transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
         verify(modification).write(PATH_1, DATA_1);
         verify(modification).merge(PATH_2, DATA_2);
         verify(modification).delete(PATH_3);
index 2876c032e3480299fff3b014a401bed1cd532a9e..b3fb51d4582e4ea6e76a4a224041508e756c1cbd 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import akka.actor.ActorRef;
 import akka.testkit.TestProbe;
 import javax.annotation.Nonnull;
 import org.junit.Assert;
@@ -42,6 +43,10 @@ class TransactionTester<T extends AbstractProxyTransaction> {
         this.backendProbe = backendProbe;
     }
 
+    ActorRef localActor() {
+        return connection.localActor();
+    }
+
     T getTransaction() {
         return transaction;
     }