BUG-8422: Propagate enqueue time 51/56951/4
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 9 May 2017 21:55:31 +0000 (23:55 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 12 May 2017 21:35:18 +0000 (21:35 +0000)
When we are replaying requests onto a connection we really want
to leave their enqueue times intact, so they time out properly.
This codepath is specific for the replay case, hence we do not
want to incur any waiting, either.

This patch introduces enqueueRequest() which does not wait for
the queue duration and audits code paths so they end up talking
to the right method -- either enqueueRequest() or sendRequest().

Change-Id: Ibf97dcc11e32d9ffa911c78ccf0448d6891a9cac
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 17e4759c7561e09786a22210e43b5b32db45149e)

15 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java

index 2423473..d37893b 100644 (file)
@@ -87,6 +87,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return context.self();
     }
 
+    public final long currentTime() {
+        return context.ticker().read();
+    }
+
     /**
      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
      * from any thread.
@@ -98,13 +102,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * @param callback Callback to invoke
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        final RequestException maybePoison = poisoned;
-        if (maybePoison != null) {
-            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+        final long now = currentTime();
+        final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+        try {
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
         }
+    }
 
-        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-        enqueueAndWait(entry, entry.getEnqueuedTicks());
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * <p>
+     * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
+     * should never be called from an application thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
+     */
+    public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
     public abstract Optional<T> getBackendInfo();
@@ -122,19 +144,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, readTime());
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
 
-    private long readTime() {
-        return context.ticker().read();
-    }
-
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
+            final RequestException maybePoison = poisoned;
+            if (maybePoison != null) {
+                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+            }
+
             if (queue.isEmpty()) {
                 // The queue is becoming non-empty, schedule a timer
                 scheduleTimer(REQUEST_TIMEOUT_DURATION);
@@ -145,15 +168,6 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    final void enqueueAndWait(final ConnectionEntry entry, final long now) {
-        final long delay = enqueueEntry(entry, now);
-        try {
-            TimeUnit.NANOSECONDS.sleep(delay);
-        } catch (InterruptedException e) {
-            LOG.debug("Interrupted while sleeping", e);
-        }
-    }
-
     final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
         lock.lock();
         try {
@@ -197,7 +211,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         lock.lock();
         try {
             haveTimer = false;
-            final long now = readTime();
+            final long now = currentTime();
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             final long ticksSinceProgress = queue.ticksStalling(now);
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
@@ -284,7 +298,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
-        final long now = readTime();
+        final long now = currentTime();
 
         final Optional<TransmittedConnectionEntry> maybeEntry;
         lock.lock();
index 5475377..8d769b2 100644 (file)
@@ -50,7 +50,7 @@ public class ConnectionEntry implements Immutable {
         callback.accept(response);
     }
 
-    final long getEnqueuedTicks() {
+    public final long getEnqueuedTicks() {
         return enqueuedTicks;
     }
 
@@ -60,6 +60,6 @@ public class ConnectionEntry implements Immutable {
     }
 
     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return toStringHelper.add("request", request);
+        return toStringHelper.add("request", request).add("enqueuedTicks", enqueuedTicks);
     }
 }
index d384ba4..9ab80d0 100644 (file)
@@ -159,6 +159,9 @@ abstract class TransmitQueue {
             return 0;
         }
 
+        // XXX: we should place a guard against incorrect entry sequences:
+        // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
+
         // Reserve an entry before we do anything that can fail
         final long delay = tracker.openTask(now);
         if (canTransmitCount(inflight.size()) <= 0) {
index 83ba07b..46af74a 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -229,6 +230,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return doRead(path);
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback);
+        parent.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
         parent.sendRequest(request, callback);
@@ -324,10 +331,15 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            purge();
+            sendPurge();
         });
     }
 
+    final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
+            enqueuedTicks);
+    }
+
     final void sendAbort(final Consumer<Response<?, ?>> callback) {
         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
     }
@@ -357,7 +369,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    purge();
+                    sendPurge();
                 });
 
                 return ret;
@@ -451,11 +463,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
-            purge();
+            sendPurge();
         });
     }
 
-    void purge() {
+    final void sendPurge() {
         successfulRequests.clear();
 
         final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
@@ -465,6 +477,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
+    final void enqueuePurge(final long enqueuedTicks) {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        enqueueRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
+            parent.completeTransaction(this);
+        }, enqueuedTicks);
+    }
+
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
@@ -489,17 +511,26 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         local.setSuccessor(successor);
 
         // Replay successful requests first
-        for (Object obj : successfulRequests) {
-            if (obj instanceof TransactionRequest) {
-                LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                successor.replay((TransactionRequest<?>) obj, response -> { });
-            } else {
-                Verify.verify(obj instanceof IncrementSequence);
-                successor.incrementSequence(((IncrementSequence) obj).getDelta());
+        if (!successfulRequests.isEmpty()) {
+            // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+            // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+            // time. We will pick the time of the first entry available. If there is none, we will just use current
+            // time, as all other requests will get enqueued afterwards.
+            final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+            final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+            for (Object obj : successfulRequests) {
+                if (obj instanceof TransactionRequest) {
+                    LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+                    successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+                } else {
+                    Verify.verify(obj instanceof IncrementSequence);
+                    successor.incrementSequence(((IncrementSequence) obj).getDelta());
+                }
             }
+            LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+            successfulRequests.clear();
         }
-        LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
-        successfulRequests.clear();
 
         // Now replay whatever is in the connection
         final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
@@ -509,8 +540,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
-                LOG.debug("Forwarding queued request {} to successor {}", req, successor);
-                successor.replay((TransactionRequest<?>) req, e.getCallback());
+                LOG.debug("Replaying queued request {} to successor {}", req, successor);
+                successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
                 it.remove();
             }
         }
@@ -537,12 +568,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks ticker-based time stamp when the request was enqueued
      */
-    private void replay(TransactionRequest<?> request, Consumer<Response<?, ?>> callback) {
+    private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
         if (request instanceof AbstractLocalTransactionRequest) {
-            handleForwardedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback);
+            handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
         } else {
-            handleForwardedRemoteRequest(request, callback);
+            handleReplayedRemoteRequest(request, callback, enqueuedTicks);
         }
     }
 
@@ -563,8 +596,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Original callback
      */
     final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
-        final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
+        forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback);
+    }
 
+    final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
         if (successor instanceof LocalProxyTransaction) {
             forwardToLocal((LocalProxyTransaction)successor, request, callback);
         } else if (successor instanceof RemoteProxyTransaction) {
@@ -615,9 +651,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks Time stamp to use for enqueue time
      */
-    abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest<?> request,
-            @Nullable Consumer<Response<?, ?>> callback);
+    abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     /**
      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
@@ -627,9 +664,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      *
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
+     * @param enqueuedTicks Time stamp to use for enqueue time
      */
-    abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
-            @Nullable Consumer<Response<?, ?>> callback);
+    abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
 
     @Override
     public final String toString() {
index f2734ae..a518c55 100644 (file)
@@ -55,7 +55,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
     }
 
-
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
         final Request<? , ?> request = entry.getRequest();
@@ -76,8 +75,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
                 throw new CohortNotFoundException(historyId);
             }
 
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
             cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
         } catch (RequestException e) {
             entry.complete(request.toRequestFailure(e));
index 7facc51..b228293 100644 (file)
@@ -72,6 +72,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
 
+    abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
+            @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
     @Override
     final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
@@ -90,30 +93,61 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
         if (request instanceof AbortLocalTransactionRequest) {
-            sendAbort(request, callback);
+            enqueueAbort(request, callback, enqueuedTicks);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
     }
 
-    @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+    private boolean handleReadRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
-        if (request instanceof ModifyTransactionRequest) {
-            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
-        } else if (request instanceof ReadTransactionRequest) {
+        if (request instanceof ReadTransactionRequest) {
             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
             final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
             callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            return true;
         } else if (request instanceof ExistsTransactionRequest) {
             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
             final boolean result = readOnlyView().readNode(path).isPresent();
             callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        if (request instanceof ModifyTransactionRequest) {
+            replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
+        } else if (handleReadRequest(request, callback)) {
+            // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            purge();
+            enqueuePurge(enqueuedTicks);
+        } else {
+            throw new IllegalArgumentException("Unhandled request " + request);
+        }
+    }
+
+    /**
+     * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)},
+     * except it is invoked in the forwarding path from
+     * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}.
+     *
+     * @param request Forwarded request
+     * @param callback Callback to be invoked once the request completes
+     */
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        if (request instanceof ModifyTransactionRequest) {
+            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else if (handleReadRequest(request, callback)) {
+            // No-op
+        } else if (request instanceof TransactionPurgeRequest) {
+            sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -153,7 +187,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.purge();
+            successor.sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -165,7 +199,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.purge();
+            successor.sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -176,4 +210,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         sendRequest(request, callback);
     }
+
+    void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        enqueueRequest(request, callback, enqueuedTicks);
+    }
 }
index 0f2d007..b600a66 100644 (file)
@@ -79,10 +79,23 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     @Override
     void applyModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback) {
+        commonModifyTransactionRequest(request, callback);
+        abort();
+    }
+
+    @Override
+    void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        commonModifyTransactionRequest(request, callback);
+        // FIXME: this should go through the enqueueRequest() path
+        abort();
+    }
+
+    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+            final Consumer<Response<?, ?>> callback) {
         Verify.verify(request.getModifications().isEmpty());
 
         final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
         Verify.verify(protocol == PersistenceProtocol.ABORT);
-        abort();
     }
 }
index 720ada3..424a9ea 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
@@ -203,6 +205,18 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     @Override
     void applyModifyTransactionRequest(final ModifyTransactionRequest request,
             final @Nullable Consumer<Response<?, ?>> callback) {
+        commonModifyTransactionRequest(request, callback, this::sendRequest);
+    }
+
+    @Override
+    void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
+    }
+
+    private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback,
+            final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
         for (final TransactionModification mod : request.getModifications()) {
             if (mod instanceof TransactionWrite) {
                 write(mod.getPath(), ((TransactionWrite)mod).getData());
@@ -215,23 +229,23 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             }
         }
 
-        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+        final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
             Verify.verify(callback != null, "Request {} has null callback", request);
             ensureSealed();
 
             switch (maybeProtocol.get()) {
                 case ABORT:
-                    sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
+                    sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
                     break;
                 case READY:
                     // No-op, as we have already issued a seal()
                     break;
                 case SIMPLE:
-                    sendRequest(commitRequest(false), callback);
+                    sendMethod.accept(commitRequest(false), callback);
                     break;
                 case THREE_PHASE:
-                    sendRequest(commitRequest(true), callback);
+                    sendMethod.accept(commitRequest(true), callback);
                     break;
                 default:
                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
@@ -240,18 +254,35 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long now) {
         if (request instanceof CommitLocalTransactionRequest) {
             sendCommit((CommitLocalTransactionRequest) request, callback);
         } else {
-            super.handleForwardedLocalRequest(request, callback);
+            super.handleReplayedLocalRequest(request, callback, now);
         }
     }
 
     @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
-            final @Nullable Consumer<Response<?, ?>> callback) {
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        LOG.debug("Applying replayed request {}", request);
+
+        if (request instanceof TransactionPreCommitRequest) {
+            enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionAbortRequest) {
+            enqueueAbort(callback, enqueuedTicks);
+        } else {
+            super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
+        }
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         LOG.debug("Applying forwarded request {}", request);
 
         if (request instanceof TransactionPreCommitRequest) {
@@ -283,6 +314,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         closedException = this::abortedException;
     }
 
+    @Override
+    void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        super.enqueueAbort(request, callback, enqueuedTicks);
+        closedException = this::abortedException;
+    }
+
     private CursorAwareDataTreeModification getModification() {
         if (closedException != null) {
             throw closedException.get();
index 2a21b8e..8c3b485 100644 (file)
@@ -327,6 +327,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return identifier;
     }
 
+    final long currentTime() {
+        return connection.currentTime();
+    }
+
     final ActorRef localActor() {
         return connection.localActor();
     }
@@ -391,6 +395,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        connection.enqueueRequest(request, callback, enqueuedTicks);
+    }
+
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         connection.sendRequest(request, callback);
     }
index 192205d..09a8a60 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -97,17 +96,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doDelete(final YangInstanceIdentifier path) {
-        appendModification(new TransactionDelete(path));
+        appendModification(new TransactionDelete(path), Optional.absent());
     }
 
     @Override
     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        appendModification(new TransactionMerge(path, data));
+        appendModification(new TransactionMerge(path, data), Optional.absent());
     }
 
     @Override
     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        appendModification(new TransactionWrite(path, data));
+        appendModification(new TransactionWrite(path, data), Optional.absent());
     }
 
     private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
@@ -158,83 +157,42 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
+    private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
+        if (builderBusy) {
+            flushBuilder(enqueuedTicks);
+        }
+    }
+
     private void flushBuilder() {
+        flushBuilder(Optional.absent());
+    }
+
+    private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
 
-        sendModification(request);
+        sendModification(request, enqueuedTicks);
     }
 
-    private void sendModification(final TransactionRequest<?> request) {
-        sendRequest(request, response -> completeModify(request, response));
-    }
-
-    @Override
-    void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) {
-        if (request instanceof CommitLocalTransactionRequest) {
-            replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback);
-        } else if (request instanceof AbortLocalTransactionRequest) {
-            sendRequest(abortRequest(), callback);
+    private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
+        if (enqueuedTicks.isPresent()) {
+            enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
         } else {
-            throw new IllegalStateException("Unhandled request " + request);
+            sendRequest(request, response -> completeModify(request, response));
         }
     }
 
-    private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
-            final Consumer<Response<?, ?>> callback) {
-        final DataTreeModification mod = request.getModification();
-        mod.applyToCursor(new AbstractDataTreeModificationCursor() {
-            @Override
-            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
-                doWrite(current().node(child), data);
-            }
-
-            @Override
-            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
-                doMerge(current().node(child), data);
-            }
-
-            @Override
-            public void delete(final PathArgument child) {
-                doDelete(current().node(child));
-            }
-        });
-
-        sendRequest(commitRequest(request.isCoordinated()), callback);
-    }
-
-    @Override
-    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
-            final @Nullable Consumer<Response<?, ?>> callback) {
-        nextSequence();
-
-        if (callback == null) {
-            sendModification(request);
-            return;
-        }
-
-        /*
-         * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null
-         * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below.
-         */
-        final Consumer<Response<?, ?>> findBugsIsStupid = callback;
-
-        // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-        //        period required to get into the queue.
-        sendRequest(request, response -> {
-            findBugsIsStupid.accept(Preconditions.checkNotNull(response));
-            completeModify(request, response);
-        });
+    private void appendModification(final TransactionModification modification) {
+        appendModification(modification, Optional.absent());
     }
 
-    private void appendModification(final TransactionModification modification) {
+    private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
         if (operationFailure == null) {
             ensureInitializedBuilder();
 
             builder.addModification(modification);
             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
-                flushBuilder();
+                flushBuilder(enqueuedTicks);
             }
         } else {
             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
@@ -329,7 +287,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         if (builderBusy) {
             final ModifyTransactionRequest request = builder.build();
             builderBusy = false;
-            successor.handleForwardedRemoteRequest(request, null);
+            forwardToSuccessor(successor, request, null);
         }
     }
 
@@ -349,15 +307,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             if (maybeProto.isPresent()) {
                 ensureSealed();
 
+                final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
                     case ABORT:
-                        sendRequest(abortRequest(), callback);
+                        tmp = abortRequest();
+                        sendRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            callback.accept(resp);
+                        });
                         break;
                     case SIMPLE:
-                        sendRequest(commitRequest(false), callback);
+                        tmp = commitRequest(false);
+                        sendRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            callback.accept(resp);
+                        });
                         break;
                     case THREE_PHASE:
-                        sendRequest(commitRequest(true), callback);
+                        tmp = commitRequest(true);
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     case READY:
                         //no op
@@ -369,14 +340,25 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         } else if (request instanceof ReadTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    callback.accept(resp);
+                });
         } else if (request instanceof ExistsTransactionRequest) {
             ensureFlushedBuider();
             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
-                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    callback.accept(resp);
+                });
         } else if (request instanceof TransactionPreCommitRequest) {
             ensureFlushedBuider();
-            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+            final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+                localActor());
+            sendRequest(tmp, resp -> {
+                recordSuccessfulRequest(tmp);
+                callback.accept(resp);
+            });
         } else if (request instanceof TransactionDoCommitRequest) {
             ensureFlushedBuider();
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
@@ -384,7 +366,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            purge();
+            sendPurge();
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -395,4 +377,123 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             final Consumer<Response<?, ?>> callback) {
         successor.handleForwardedRemoteRequest(request, callback);
     }
+
+    @Override
+    void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        if (request instanceof CommitLocalTransactionRequest) {
+            replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            enqueueRequest(abortRequest(), callback, enqueuedTicks);
+        } else {
+            throw new IllegalStateException("Unhandled request " + request);
+        }
+    }
+
+    private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
+            final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        final DataTreeModification mod = request.getModification();
+        final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+        mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+            @Override
+            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                appendModification(new TransactionWrite(current().node(child), data), optTicks);
+            }
+
+            @Override
+            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                appendModification(new TransactionMerge(current().node(child), data), optTicks);
+            }
+
+            @Override
+            public void delete(final PathArgument child) {
+                appendModification(new TransactionDelete(current().node(child)), optTicks);
+            }
+        });
+
+        enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
+    }
+
+    @Override
+    void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+        final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+        if (request instanceof ModifyTransactionRequest) {
+            final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+            for (TransactionModification mod : req.getModifications()) {
+                appendModification(mod, optTicks);
+            }
+
+            final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+            if (maybeProto.isPresent()) {
+                ensureSealed();
+
+                final TransactionRequest<?> tmp;
+                switch (maybeProto.get()) {
+                    case ABORT:
+                        tmp = abortRequest();
+                        enqueueRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case SIMPLE:
+                        tmp = commitRequest(false);
+                        enqueueRequest(tmp, resp -> {
+                            completeModify(tmp, resp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case THREE_PHASE:
+                        tmp = commitRequest(true);
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
+                        break;
+                    case READY:
+                        //no op
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+                }
+            }
+        } else if (request instanceof ReadTransactionRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    cb.accept(resp);
+                }, enqueuedTicks);
+        } else if (request instanceof ExistsTransactionRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+                    recordFinishedRequest();
+                    cb.accept(resp);
+                }, enqueuedTicks);
+        } else if (request instanceof TransactionPreCommitRequest) {
+            ensureFlushedBuider(optTicks);
+            final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+                localActor());
+            enqueueRequest(tmp, resp -> {
+                recordSuccessfulRequest(tmp);
+                cb.accept(resp);
+            }, enqueuedTicks);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+                enqueuedTicks);
+        } else if (request instanceof TransactionAbortRequest) {
+            ensureFlushedBuider(optTicks);
+            enqueueAbort(callback, enqueuedTicks);
+        } else if (request instanceof TransactionPurgeRequest) {
+            enqueuePurge(enqueuedTicks);
+        } else {
+            throw new IllegalArgumentException("Unhandled request {}" + request);
+        }
+    }
 }
index e02701d..9db1907 100644 (file)
@@ -17,6 +17,7 @@ import static org.mockito.Mockito.when;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.primitives.UnsignedLong;
 import java.util.ArrayList;
 import java.util.List;
@@ -169,16 +170,16 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         final List<ConnectionEntry> entries = new ArrayList<>();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
         final ReadTransactionRequest request1 =
-                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+                new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
         final ExistsTransactionRequest request2 =
-                new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+                new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
         transaction.recordSuccessfulRequest(successful1);
         final ReadTransactionRequest successful2 =
-                new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+                new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
         transaction.recordSuccessfulRequest(successful2);
         transaction.startReconnect();
         transaction.replayMessages(successor.getTransaction(), entries);
@@ -188,9 +189,27 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
         Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
         Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
-        Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
-        Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
-        Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+
+        ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+        Assert.assertNotNull(tmpRead);
+        Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
+        Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
+        Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
+        Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+        tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+        Assert.assertNotNull(tmpRead);
+        Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
+        Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
+        Assert.assertEquals(request1.getPath(), tmpRead.getPath());
+        Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+        final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
+        Assert.assertNotNull(tmpExist);
+        Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
+        Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
+        Assert.assertEquals(request2.getPath(), tmpExist.getPath());
+        Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
     }
 
     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
@@ -215,7 +234,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
     }
 
     protected <T extends TransactionRequest<?>> T testHandleForwardedRemoteRequest(final T request) throws Exception {
-        transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+        transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
         final T received = (T) envelope.getMessage();
         Assert.assertTrue(received.getClass().equals(request.getClass()));
index 7815a26..dccdf9e 100644 (file)
@@ -14,6 +14,7 @@ import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import java.util.function.Consumer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,7 +64,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final ReadTransactionRequest request =
                 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.handleForwardedRemoteRequest(request, callback);
+        transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());
         final Response<?, ?> value = captor.getValue();
@@ -79,7 +80,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final ExistsTransactionRequest request =
                 new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.handleForwardedRemoteRequest(request, callback);
+        transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
         final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(callback).accept(captor.capture());
         final Response<?, ?> value = captor.getValue();
index 68e106a..c3dfebd 100644 (file)
@@ -11,6 +11,7 @@ import static org.mockito.Mockito.when;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.base.VerifyException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -114,7 +115,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest
         builder.setSequence(0);
         builder.setAbort();
         final ModifyTransactionRequest request = builder.build();
-        transaction.applyModifyTransactionRequest(request, createCallbackMock());
+        transaction.replayModifyTransactionRequest(request, createCallbackMock(), Ticker.systemTicker().read());
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 
@@ -126,8 +127,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest
         builder.setSequence(0);
         builder.setReady();
         final ModifyTransactionRequest request = builder.build();
-        assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()),
-                VerifyException.class);
+        assertOperationThrowsException(() -> transaction.replayModifyTransactionRequest(request, createCallbackMock(),
+            Ticker.systemTicker().read()), VerifyException.class);
     }
-
 }
\ No newline at end of file
index 390302b..7eebfe0 100644 (file)
@@ -16,6 +16,7 @@ import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtil
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
 
 import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.function.Consumer;
 import org.junit.Assert;
@@ -176,7 +177,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         builder.setAbort();
         final ModifyTransactionRequest request = builder.build();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.applyModifyTransactionRequest(request, callback);
+        transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 
@@ -235,7 +236,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         builder.setCommit(coordinated);
         final ModifyTransactionRequest request = builder.build();
         final Consumer<Response<?, ?>> callback = createCallbackMock();
-        transaction.applyModifyTransactionRequest(request, callback);
+        transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
         verify(modification).write(PATH_1, DATA_1);
         verify(modification).merge(PATH_2, DATA_2);
         verify(modification).delete(PATH_3);
index 2876c03..b3fb51d 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import akka.actor.ActorRef;
 import akka.testkit.TestProbe;
 import javax.annotation.Nonnull;
 import org.junit.Assert;
@@ -42,6 +43,10 @@ class TransactionTester<T extends AbstractProxyTransaction> {
         this.backendProbe = backendProbe;
     }
 
+    ActorRef localActor() {
+        return connection.localActor();
+    }
+
     T getTransaction() {
         return transaction;
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.