From e1c283de301355cb8fa3f7d4fa28a6dd0af501eb Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 9 May 2017 23:55:31 +0200 Subject: [PATCH] BUG-8422: Propagate enqueue time When we are replaying requests onto a connection we really want to leave their enqueue times intact, so they time out properly. This codepath is specific for the replay case, hence we do not want to incur any waiting, either. This patch introduces enqueueRequest() which does not wait for the queue duration and audits code paths so they end up talking to the right method -- either enqueueRequest() or sendRequest(). Change-Id: Ibf97dcc11e32d9ffa911c78ccf0448d6891a9cac Signed-off-by: Robert Varga (cherry picked from commit 17e4759c7561e09786a22210e43b5b32db45149e) --- .../client/AbstractClientConnection.java | 56 ++-- .../access/client/ConnectionEntry.java | 4 +- .../cluster/access/client/TransmitQueue.java | 3 + .../actors/dds/AbstractProxyTransaction.java | 84 ++++-- .../dds/BouncingReconnectForwarder.java | 3 - .../actors/dds/LocalProxyTransaction.java | 61 ++++- .../dds/LocalReadOnlyProxyTransaction.java | 15 +- .../dds/LocalReadWriteProxyTransaction.java | 56 +++- .../databroker/actors/dds/ProxyHistory.java | 9 + .../actors/dds/RemoteProxyTransaction.java | 245 +++++++++++++----- .../dds/AbstractProxyTransactionTest.java | 33 ++- .../actors/dds/LocalProxyTransactionTest.java | 5 +- .../LocalReadOnlyProxyTransactionTest.java | 8 +- .../LocalReadWriteProxyTransactionTest.java | 5 +- .../actors/dds/TransactionTester.java | 5 + 15 files changed, 435 insertions(+), 157 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 2423473472..d37893bd7c 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -87,6 +87,10 @@ public abstract class AbstractClientConnection { return context.self(); } + public final long currentTime() { + return context.ticker().read(); + } + /** * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke * from any thread. @@ -98,13 +102,31 @@ public abstract class AbstractClientConnection { * @param callback Callback to invoke */ public final void sendRequest(final Request request, final Consumer> callback) { - final RequestException maybePoison = poisoned; - if (maybePoison != null) { - throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); + final long now = currentTime(); + final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now); + try { + TimeUnit.NANOSECONDS.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now); } + } - final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime()); - enqueueAndWait(entry, entry.getEnqueuedTicks()); + /** + * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke + * from any thread. + * + *

+ * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it + * should never be called from an application thread. + * + * @param request Request to send + * @param callback Callback to invoke + * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue + */ + public final void enqueueRequest(final Request request, final Consumer> callback, + final long enqueuedTicks) { + enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime()); } public abstract Optional getBackendInfo(); @@ -122,19 +144,20 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") final void setForwarder(final ReconnectForwarder forwarder) { - queue.setForwarder(forwarder, readTime()); + queue.setForwarder(forwarder, currentTime()); } @GuardedBy("lock") abstract ClientActorBehavior lockedReconnect(ClientActorBehavior current); - private long readTime() { - return context.ticker().read(); - } - final long enqueueEntry(final ConnectionEntry entry, final long now) { lock.lock(); try { + final RequestException maybePoison = poisoned; + if (maybePoison != null) { + throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); + } + if (queue.isEmpty()) { // The queue is becoming non-empty, schedule a timer scheduleTimer(REQUEST_TIMEOUT_DURATION); @@ -145,15 +168,6 @@ public abstract class AbstractClientConnection { } } - final void enqueueAndWait(final ConnectionEntry entry, final long now) { - final long delay = enqueueEntry(entry, now); - try { - TimeUnit.NANOSECONDS.sleep(delay); - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping", e); - } - } - final ClientActorBehavior reconnect(final ClientActorBehavior current) { lock.lock(); try { @@ -197,7 +211,7 @@ public abstract class AbstractClientConnection { lock.lock(); try { haveTimer = false; - final long now = readTime(); + final long now = currentTime(); // The following line is only reliable when queue is not forwarding, but such state should not last long. final long ticksSinceProgress = queue.ticksStalling(now); if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { @@ -284,7 +298,7 @@ public abstract class AbstractClientConnection { } final void receiveResponse(final ResponseEnvelope envelope) { - final long now = readTime(); + final long now = currentTime(); final Optional maybeEntry; lock.lock(); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java index 547537764a..8d769b2738 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java @@ -50,7 +50,7 @@ public class ConnectionEntry implements Immutable { callback.accept(response); } - final long getEnqueuedTicks() { + public final long getEnqueuedTicks() { return enqueuedTicks; } @@ -60,6 +60,6 @@ public class ConnectionEntry implements Immutable { } ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - return toStringHelper.add("request", request); + return toStringHelper.add("request", request).add("enqueuedTicks", enqueuedTicks); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index d384ba4736..9ab80d0d00 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -159,6 +159,9 @@ abstract class TransmitQueue { return 0; } + // XXX: we should place a guard against incorrect entry sequences: + // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues + // Reserve an entry before we do anything that can fail final long delay = tracker.openTask(now); if (canTransmitCount(inflight.size()) <= 0) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 83ba07b69a..46af74a563 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -13,6 +13,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.base.Verify; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -229,6 +230,12 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback); + parent.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); parent.sendRequest(request, callback); @@ -324,10 +331,15 @@ abstract class AbstractProxyTransaction implements Identifiable> callback, final long enqueuedTicks) { + enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } + final void sendAbort(final Consumer> callback) { sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); } @@ -357,7 +369,7 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); @@ -465,6 +477,16 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + enqueueRequest(req, t -> { + LOG.debug("Transaction {} purge completed", this); + parent.completeTransaction(this); + }, enqueuedTicks); + } + // Called with the connection unlocked final synchronized void startReconnect() { // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous @@ -489,17 +511,26 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, response -> { }); - } else { - Verify.verify(obj instanceof IncrementSequence); - successor.incrementSequence(((IncrementSequence) obj).getDelta()); + if (!successfulRequests.isEmpty()) { + // We need to find a good timestamp to use for successful requests, as we do not want to time them out + // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue + // time. We will pick the time of the first entry available. If there is none, we will just use current + // time, as all other requests will get enqueued afterwards. + final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null); + final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime(); + + for (Object obj : successfulRequests) { + if (obj instanceof TransactionRequest) { + LOG.debug("Forwarding successful request {} to successor {}", obj, successor); + successor.replayRequest((TransactionRequest) obj, resp -> { }, now); + } else { + Verify.verify(obj instanceof IncrementSequence); + successor.incrementSequence(((IncrementSequence) obj).getDelta()); + } } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); } - LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); - successfulRequests.clear(); // Now replay whatever is in the connection final Iterator it = enqueuedEntries.iterator(); @@ -509,8 +540,8 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback()); + LOG.debug("Replaying queued request {} to successor {}", req, successor); + successor.replayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -537,12 +568,14 @@ abstract class AbstractProxyTransaction implements Identifiable request, Consumer> callback) { + private void replayRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { - handleForwardedLocalRequest((AbstractLocalTransactionRequest) request, callback); + handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); } else { - handleForwardedRemoteRequest(request, callback); + handleReplayedRemoteRequest(request, callback, enqueuedTicks); } } @@ -563,8 +596,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) { - final AbstractProxyTransaction successor = getSuccessorState().getSuccessor(); + forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback); + } + final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (successor instanceof LocalProxyTransaction) { forwardToLocal((LocalProxyTransaction)successor, request, callback); } else if (successor instanceof RemoteProxyTransaction) { @@ -615,9 +651,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); /** * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. @@ -627,9 +664,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, - @Nullable Consumer> callback); + abstract void handleReplayedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); @Override public final String toString() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java index f2734ae15d..a518c55169 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java @@ -55,7 +55,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder { HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier)); } - @Override protected void forwardEntry(final ConnectionEntry entry, final long now) { final Request request = entry.getRequest(); @@ -76,8 +75,6 @@ final class BouncingReconnectForwarder extends ReconnectForwarder { throw new CohortNotFoundException(historyId); } - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor); } catch (RequestException e) { entry.complete(request.toRequestFailure(e)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 7facc5160a..b228293703 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -72,6 +72,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); + abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); + @Override final CheckedFuture doExists(final YangInstanceIdentifier path) { return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); @@ -90,30 +93,61 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } @Override - void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, - final Consumer> callback) { + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { if (request instanceof AbortLocalTransactionRequest) { - sendAbort(request, callback); + enqueueAbort(request, callback, enqueuedTicks); } else { throw new IllegalArgumentException("Unhandled request" + request); } } - @Override - void handleForwardedRemoteRequest(final TransactionRequest request, + private boolean handleReadRequest(final TransactionRequest request, final @Nullable Consumer> callback) { - if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); final Optional> result = readOnlyView().readNode(path); callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + return true; } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); final boolean result = readOnlyView().readNode(path).isPresent(); callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + return true; + } else { + return false; + } + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + if (request instanceof ModifyTransactionRequest) { + replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); + } else if (handleReadRequest(request, callback)) { + // No-op } else if (request instanceof TransactionPurgeRequest) { - purge(); + enqueuePurge(enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + + /** + * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)}, + * except it is invoked in the forwarding path from + * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}. + * + * @param request Forwarded request + * @param callback Callback to be invoked once the request completes + */ + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + sendPurge(); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -153,7 +187,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { successor.abort(); } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); - successor.purge(); + successor.sendPurge(); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -165,7 +199,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.purge(); + successor.sendPurge(); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -176,4 +210,9 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); } + + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + enqueueRequest(request, callback, enqueuedTicks); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java index 0f2d007716..b600a66ea6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -79,10 +79,23 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { @Override void applyModifyTransactionRequest(final ModifyTransactionRequest request, final Consumer> callback) { + commonModifyTransactionRequest(request, callback); + abort(); + } + + @Override + void replayModifyTransactionRequest(final ModifyTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + commonModifyTransactionRequest(request, callback); + // FIXME: this should go through the enqueueRequest() path + abort(); + } + + private static void commonModifyTransactionRequest(final ModifyTransactionRequest request, + final Consumer> callback) { Verify.verify(request.getModifications().isEmpty()); final PersistenceProtocol protocol = request.getPersistenceProtocol().get(); Verify.verify(protocol == PersistenceProtocol.ABORT); - abort(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 720ada3191..424a9ea0db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Optional; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -203,6 +205,18 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { @Override void applyModifyTransactionRequest(final ModifyTransactionRequest request, final @Nullable Consumer> callback) { + commonModifyTransactionRequest(request, callback, this::sendRequest); + } + + @Override + void replayModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks)); + } + + private void commonModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback, + final BiConsumer, Consumer>> sendMethod) { for (final TransactionModification mod : request.getModifications()) { if (mod instanceof TransactionWrite) { write(mod.getPath(), ((TransactionWrite)mod).getData()); @@ -215,23 +229,23 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } } - final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); + final Optional maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { Verify.verify(callback != null, "Request {} has null callback", request); ensureSealed(); switch (maybeProtocol.get()) { case ABORT: - sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); + sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); break; case READY: // No-op, as we have already issued a seal() break; case SIMPLE: - sendRequest(commitRequest(false), callback); + sendMethod.accept(commitRequest(false), callback); break; case THREE_PHASE: - sendRequest(commitRequest(true), callback); + sendMethod.accept(commitRequest(true), callback); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); @@ -240,18 +254,35 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, - final Consumer> callback) { + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long now) { if (request instanceof CommitLocalTransactionRequest) { sendCommit((CommitLocalTransactionRequest) request, callback); } else { - super.handleForwardedLocalRequest(request, callback); + super.handleReplayedLocalRequest(request, callback, now); } } @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + LOG.debug("Applying replayed request {}", request); + + if (request instanceof TransactionPreCommitRequest) { + enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionDoCommitRequest) { + enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionAbortRequest) { + enqueueAbort(callback, enqueuedTicks); + } else { + super.handleReplayedRemoteRequest(request, callback, enqueuedTicks); + } + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { LOG.debug("Applying forwarded request {}", request); if (request instanceof TransactionPreCommitRequest) { @@ -283,6 +314,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { closedException = this::abortedException; } + @Override + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + super.enqueueAbort(request, callback, enqueuedTicks); + closedException = this::abortedException; + } + private CursorAwareDataTreeModification getModification() { if (closedException != null) { throw closedException.get(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 2a21b8e858..8c3b485134 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -327,6 +327,10 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final long currentTime() { + return connection.currentTime(); + } + final ActorRef localActor() { return connection.localActor(); } @@ -391,6 +395,11 @@ abstract class ProxyHistory implements Identifiable { } } + final void enqueueRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + connection.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { connection.sendRequest(request, callback); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 192205dc0a..09a8a60563 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -97,17 +96,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path)); + appendModification(new TransactionDelete(path), Optional.absent()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data)); + appendModification(new TransactionMerge(path, data), Optional.absent()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data)); + appendModification(new TransactionWrite(path, data), Optional.absent()); } private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, @@ -158,83 +157,42 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } + private void ensureFlushedBuider(final Optional enqueuedTicks) { + if (builderBusy) { + flushBuilder(enqueuedTicks); + } + } + private void flushBuilder() { + flushBuilder(Optional.absent()); + } + + private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; - sendModification(request); + sendModification(request, enqueuedTicks); } - private void sendModification(final TransactionRequest request) { - sendRequest(request, response -> completeModify(request, response)); - } - - @Override - void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, - final Consumer> callback) { - if (request instanceof CommitLocalTransactionRequest) { - replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback); - } else if (request instanceof AbortLocalTransactionRequest) { - sendRequest(abortRequest(), callback); + private void sendModification(final TransactionRequest request, final Optional enqueuedTicks) { + if (enqueuedTicks.isPresent()) { + enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue()); } else { - throw new IllegalStateException("Unhandled request " + request); + sendRequest(request, response -> completeModify(request, response)); } } - private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, - final Consumer> callback) { - final DataTreeModification mod = request.getModification(); - mod.applyToCursor(new AbstractDataTreeModificationCursor() { - @Override - public void write(final PathArgument child, final NormalizedNode data) { - doWrite(current().node(child), data); - } - - @Override - public void merge(final PathArgument child, final NormalizedNode data) { - doMerge(current().node(child), data); - } - - @Override - public void delete(final PathArgument child) { - doDelete(current().node(child)); - } - }); - - sendRequest(commitRequest(request.isCoordinated()), callback); - } - - @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { - nextSequence(); - - if (callback == null) { - sendModification(request); - return; - } - - /* - * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null - * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below. - */ - final Consumer> findBugsIsStupid = callback; - - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. - sendRequest(request, response -> { - findBugsIsStupid.accept(Preconditions.checkNotNull(response)); - completeModify(request, response); - }); + private void appendModification(final TransactionModification modification) { + appendModification(modification, Optional.absent()); } - private void appendModification(final TransactionModification modification) { + private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { if (operationFailure == null) { ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { - flushBuilder(); + flushBuilder(enqueuedTicks); } } else { LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier()); @@ -329,7 +287,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (builderBusy) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; - successor.handleForwardedRemoteRequest(request, null); + forwardToSuccessor(successor, request, null); } } @@ -349,15 +307,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (maybeProto.isPresent()) { ensureSealed(); + final TransactionRequest tmp; switch (maybeProto.get()) { case ABORT: - sendRequest(abortRequest(), callback); + tmp = abortRequest(); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); break; case SIMPLE: - sendRequest(commitRequest(false), callback); + tmp = commitRequest(false); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); break; case THREE_PHASE: - sendRequest(commitRequest(true), callback); + tmp = commitRequest(true); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); break; case READY: //no op @@ -369,14 +340,25 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + callback.accept(resp); + }); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + callback.accept(resp); + }); } else if (request instanceof TransactionPreCommitRequest) { ensureFlushedBuider(); - sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); } else if (request instanceof TransactionDoCommitRequest) { ensureFlushedBuider(); sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); @@ -384,7 +366,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - purge(); + sendPurge(); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -395,4 +377,123 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Consumer> callback) { successor.handleForwardedRemoteRequest(request, callback); } + + @Override + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks); + } else if (request instanceof AbortLocalTransactionRequest) { + enqueueRequest(abortRequest(), callback, enqueuedTicks); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + final DataTreeModification mod = request.getModification(); + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionWrite(current().node(child), data), optTicks); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + appendModification(new TransactionMerge(current().node(child), data), optTicks); + } + + @Override + public void delete(final PathArgument child) { + appendModification(new TransactionDelete(current().node(child)), optTicks); + } + }); + + enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks); + } + + @Override + void handleReplayedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback, final long enqueuedTicks) { + final Consumer> cb = callback != null ? callback : resp -> { }; + final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + for (TransactionModification mod : req.getModifications()) { + appendModification(mod, optTicks); + } + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + ensureSealed(); + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case SIMPLE: + tmp = commitRequest(false); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case THREE_PHASE: + tmp = commitRequest(true); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + case READY: + //no op + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + recordFinishedRequest(); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(optTicks); + final TransactionRequest tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(optTicks); + enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, + enqueuedTicks); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(optTicks); + enqueueAbort(callback, enqueuedTicks); + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java index e02701d260..9db19077e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; +import com.google.common.base.Ticker; import com.google.common.primitives.UnsignedLong; import java.util.ArrayList; import java.util.List; @@ -169,16 +170,16 @@ public abstract class AbstractProxyTransactionTest entries = new ArrayList<>(); final Consumer> callback = createCallbackMock(); final ReadTransactionRequest request1 = - new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true); + new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true); final ExistsTransactionRequest request2 = - new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true); + new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true); entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L)); entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L)); final TransactionTester successor = createRemoteProxyTransactionTester(); final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref()); transaction.recordSuccessfulRequest(successful1); final ReadTransactionRequest successful2 = - new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); + new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true); transaction.recordSuccessfulRequest(successful2); transaction.startReconnect(); transaction.replayMessages(successor.getTransaction(), entries); @@ -188,9 +189,27 @@ public abstract class AbstractProxyTransactionTest> T testHandleForwardedRemoteRequest(final T request) throws Exception { - transaction.handleForwardedRemoteRequest(request, createCallbackMock()); + transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read()); final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class); final T received = (T) envelope.getMessage(); Assert.assertTrue(received.getClass().equals(request.getClass())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java index 7815a26208..dccdf9ef70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals; import akka.testkit.TestProbe; +import com.google.common.base.Ticker; import java.util.function.Consumer; import org.junit.Assert; import org.junit.Test; @@ -63,7 +64,7 @@ public abstract class LocalProxyTransactionTest final ReadTransactionRequest request = new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); final Consumer> callback = createCallbackMock(); - transaction.handleForwardedRemoteRequest(request, callback); + transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read()); final ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(callback).accept(captor.capture()); final Response value = captor.getValue(); @@ -79,7 +80,7 @@ public abstract class LocalProxyTransactionTest final ExistsTransactionRequest request = new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); final Consumer> callback = createCallbackMock(); - transaction.handleForwardedRemoteRequest(request, callback); + transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read()); final ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(callback).accept(captor.capture()); final Response value = captor.getValue(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java index 68e106a3ff..c3dfebd6be 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransactionTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.when; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException; import akka.testkit.TestProbe; +import com.google.common.base.Ticker; import com.google.common.base.VerifyException; import org.junit.Assert; import org.junit.Test; @@ -114,7 +115,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest builder.setSequence(0); builder.setAbort(); final ModifyTransactionRequest request = builder.build(); - transaction.applyModifyTransactionRequest(request, createCallbackMock()); + transaction.replayModifyTransactionRequest(request, createCallbackMock(), Ticker.systemTicker().read()); getTester().expectTransactionRequest(AbortLocalTransactionRequest.class); } @@ -126,8 +127,7 @@ public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest builder.setSequence(0); builder.setReady(); final ModifyTransactionRequest request = builder.build(); - assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()), - VerifyException.class); + assertOperationThrowsException(() -> transaction.replayModifyTransactionRequest(request, createCallbackMock(), + Ticker.systemTicker().read()), VerifyException.class); } - } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java index 390302b539..7eebfe02ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java @@ -16,6 +16,7 @@ import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtil import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException; import akka.testkit.TestProbe; +import com.google.common.base.Ticker; import com.google.common.util.concurrent.ListenableFuture; import java.util.function.Consumer; import org.junit.Assert; @@ -176,7 +177,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes builder.setAbort(); final ModifyTransactionRequest request = builder.build(); final Consumer> callback = createCallbackMock(); - transaction.applyModifyTransactionRequest(request, callback); + transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read()); getTester().expectTransactionRequest(AbortLocalTransactionRequest.class); } @@ -235,7 +236,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes builder.setCommit(coordinated); final ModifyTransactionRequest request = builder.build(); final Consumer> callback = createCallbackMock(); - transaction.applyModifyTransactionRequest(request, callback); + transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read()); verify(modification).write(PATH_1, DATA_1); verify(modification).merge(PATH_2, DATA_2); verify(modification).delete(PATH_3); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java index 2876c032e3..b3fb51d458 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/TransactionTester.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import akka.actor.ActorRef; import akka.testkit.TestProbe; import javax.annotation.Nonnull; import org.junit.Assert; @@ -42,6 +43,10 @@ class TransactionTester { this.backendProbe = backendProbe; } + ActorRef localActor() { + return connection.localActor(); + } + T getTransaction() { return transaction; } -- 2.36.6