return context.self();
}
+ public final long currentTime() {
+ return context.ticker().read();
+ }
+
/**
* Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
* from any thread.
* @param callback Callback to invoke
*/
public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
- final RequestException maybePoison = poisoned;
- if (maybePoison != null) {
- throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ final long now = currentTime();
+ final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+ try {
+ TimeUnit.NANOSECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
}
+ }
- final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
- enqueueAndWait(entry, entry.getEnqueuedTicks());
+ /**
+ * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+ * from any thread.
+ *
+ * <p>
+ * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
+ * should never be called from an application thread.
+ *
+ * @param request Request to send
+ * @param callback Callback to invoke
+ * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
+ */
+ public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
}
public abstract Optional<T> getBackendInfo();
@GuardedBy("lock")
final void setForwarder(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder, readTime());
+ queue.setForwarder(forwarder, currentTime());
}
@GuardedBy("lock")
abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
- private long readTime() {
- return context.ticker().read();
- }
-
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
+ final RequestException maybePoison = poisoned;
+ if (maybePoison != null) {
+ throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ }
+
if (queue.isEmpty()) {
// The queue is becoming non-empty, schedule a timer
scheduleTimer(REQUEST_TIMEOUT_DURATION);
}
}
- final void enqueueAndWait(final ConnectionEntry entry, final long now) {
- final long delay = enqueueEntry(entry, now);
- try {
- TimeUnit.NANOSECONDS.sleep(delay);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping", e);
- }
- }
-
final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
lock.lock();
try {
lock.lock();
try {
haveTimer = false;
- final long now = readTime();
+ final long now = currentTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
final long ticksSinceProgress = queue.ticksStalling(now);
if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
}
final void receiveResponse(final ResponseEnvelope<?> envelope) {
- final long now = readTime();
+ final long now = currentTime();
final Optional<TransmittedConnectionEntry> maybeEntry;
lock.lock();
callback.accept(response);
}
- final long getEnqueuedTicks() {
+ public final long getEnqueuedTicks() {
return enqueuedTicks;
}
}
ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return toStringHelper.add("request", request);
+ return toStringHelper.add("request", request).add("enqueuedTicks", enqueuedTicks);
}
}
return 0;
}
+ // XXX: we should place a guard against incorrect entry sequences:
+ // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
+
// Reserve an entry before we do anything that can fail
final long delay = tracker.openTask(now);
if (canTransmitCount(inflight.size()) <= 0) {
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
return doRead(path);
}
+ final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback);
+ parent.enqueueRequest(request, callback, enqueuedTicks);
+ }
+
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
parent.sendRequest(request, callback);
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} abort completed", this);
- purge();
+ sendPurge();
});
}
+ final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
+ enqueuedTicks);
+ }
+
final void sendAbort(final Consumer<Response<?, ?>> callback) {
sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
}
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} directCommit completed", this);
- purge();
+ sendPurge();
});
return ret;
}
LOG.debug("Transaction {} doCommit completed", this);
- purge();
+ sendPurge();
});
}
- void purge() {
+ final void sendPurge() {
successfulRequests.clear();
final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
});
}
+ final void enqueuePurge(final long enqueuedTicks) {
+ successfulRequests.clear();
+
+ final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+ enqueueRequest(req, t -> {
+ LOG.debug("Transaction {} purge completed", this);
+ parent.completeTransaction(this);
+ }, enqueuedTicks);
+ }
+
// Called with the connection unlocked
final synchronized void startReconnect() {
// At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
local.setSuccessor(successor);
// Replay successful requests first
- for (Object obj : successfulRequests) {
- if (obj instanceof TransactionRequest) {
- LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.replay((TransactionRequest<?>) obj, response -> { });
- } else {
- Verify.verify(obj instanceof IncrementSequence);
- successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ if (!successfulRequests.isEmpty()) {
+ // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+ // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+ // time. We will pick the time of the first entry available. If there is none, we will just use current
+ // time, as all other requests will get enqueued afterwards.
+ final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+ final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+ successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ }
}
+ LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+ successfulRequests.clear();
}
- LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
- successfulRequests.clear();
// Now replay whatever is in the connection
final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
if (getIdentifier().equals(req.getTarget())) {
Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
- LOG.debug("Forwarding queued request {} to successor {}", req, successor);
- successor.replay((TransactionRequest<?>) req, e.getCallback());
+ LOG.debug("Replaying queued request {} to successor {}", req, successor);
+ successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
}
}
*
* @param request Request which needs to be forwarded
* @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks ticker-based time stamp when the request was enqueued
*/
- private void replay(TransactionRequest<?> request, Consumer<Response<?, ?>> callback) {
+ private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
if (request instanceof AbstractLocalTransactionRequest) {
- handleForwardedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback);
+ handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
} else {
- handleForwardedRemoteRequest(request, callback);
+ handleReplayedRemoteRequest(request, callback, enqueuedTicks);
}
}
* @param callback Original callback
*/
final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
- final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
+ forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback);
+ }
+ final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
if (successor instanceof LocalProxyTransaction) {
forwardToLocal((LocalProxyTransaction)successor, request, callback);
} else if (successor instanceof RemoteProxyTransaction) {
*
* @param request Request which needs to be forwarded
* @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks Time stamp to use for enqueue time
*/
- abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest<?> request,
- @Nullable Consumer<Response<?, ?>> callback);
+ abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest<?> request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
/**
* Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
*
* @param request Request which needs to be forwarded
* @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks Time stamp to use for enqueue time
*/
- abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
- @Nullable Consumer<Response<?, ?>> callback);
+ abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
@Override
public final String toString() {
HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
}
-
@Override
protected void forwardEntry(final ConnectionEntry entry, final long now) {
final Request<? , ?> request = entry.getRequest();
throw new CohortNotFoundException(historyId);
}
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
} catch (RequestException e) {
entry.complete(request.toRequestFailure(e));
abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
+ abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
@Override
final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
}
@Override
- void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) {
+ void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
if (request instanceof AbortLocalTransactionRequest) {
- sendAbort(request, callback);
+ enqueueAbort(request, callback, enqueuedTicks);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
}
- @Override
- void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ private boolean handleReadRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
final boolean result = readOnlyView().readNode(path).isPresent();
callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ if (request instanceof ModifyTransactionRequest) {
+ replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
} else if (request instanceof TransactionPurgeRequest) {
- purge();
+ enqueuePurge(enqueuedTicks);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+
+ /**
+ * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)},
+ * except it is invoked in the forwarding path from
+ * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}.
+ *
+ * @param request Forwarded request
+ * @param callback Callback to be invoked once the request completes
+ */
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ if (request instanceof ModifyTransactionRequest) {
+ applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
+ } else if (request instanceof TransactionPurgeRequest) {
+ sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
sendRequest(request, callback);
}
+
+ void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ enqueueRequest(request, callback, enqueuedTicks);
+ }
}
@Override
void applyModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback) {
+ commonModifyTransactionRequest(request, callback);
+ abort();
+ }
+
+ @Override
+ void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ commonModifyTransactionRequest(request, callback);
+ // FIXME: this should go through the enqueueRequest() path
+ abort();
+ }
+
+ private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+ final Consumer<Response<?, ?>> callback) {
Verify.verify(request.getModifications().isEmpty());
final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
Verify.verify(protocol == PersistenceProtocol.ABORT);
- abort();
}
}
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Optional;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
@Override
void applyModifyTransactionRequest(final ModifyTransactionRequest request,
final @Nullable Consumer<Response<?, ?>> callback) {
+ commonModifyTransactionRequest(request, callback, this::sendRequest);
+ }
+
+ @Override
+ void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
+ }
+
+ private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
+ final @Nullable Consumer<Response<?, ?>> callback,
+ final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
for (final TransactionModification mod : request.getModifications()) {
if (mod instanceof TransactionWrite) {
write(mod.getPath(), ((TransactionWrite)mod).getData());
}
}
- final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+ final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
if (maybeProtocol.isPresent()) {
Verify.verify(callback != null, "Request {} has null callback", request);
ensureSealed();
switch (maybeProtocol.get()) {
case ABORT:
- sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
+ sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
break;
case READY:
// No-op, as we have already issued a seal()
break;
case SIMPLE:
- sendRequest(commitRequest(false), callback);
+ sendMethod.accept(commitRequest(false), callback);
break;
case THREE_PHASE:
- sendRequest(commitRequest(true), callback);
+ sendMethod.accept(commitRequest(true), callback);
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
}
@Override
- void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) {
+ void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback, final long now) {
if (request instanceof CommitLocalTransactionRequest) {
sendCommit((CommitLocalTransactionRequest) request, callback);
} else {
- super.handleForwardedLocalRequest(request, callback);
+ super.handleReplayedLocalRequest(request, callback, now);
}
}
@Override
- void handleForwardedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback) {
+ void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ LOG.debug("Applying replayed request {}", request);
+
+ if (request instanceof TransactionPreCommitRequest) {
+ enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+ enqueuedTicks);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+ enqueuedTicks);
+ } else if (request instanceof TransactionAbortRequest) {
+ enqueueAbort(callback, enqueuedTicks);
+ } else {
+ super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
+ }
+ }
+
+ @Override
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
LOG.debug("Applying forwarded request {}", request);
if (request instanceof TransactionPreCommitRequest) {
closedException = this::abortedException;
}
+ @Override
+ void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ super.enqueueAbort(request, callback, enqueuedTicks);
+ closedException = this::abortedException;
+ }
+
private CursorAwareDataTreeModification getModification() {
if (closedException != null) {
throw closedException.get();
return identifier;
}
+ final long currentTime() {
+ return connection.currentTime();
+ }
+
final ActorRef localActor() {
return connection.localActor();
}
}
}
+ final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ connection.enqueueRequest(request, callback, enqueuedTicks);
+ }
+
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
connection.sendRequest(request, callback);
}
package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@Override
void doDelete(final YangInstanceIdentifier path) {
- appendModification(new TransactionDelete(path));
+ appendModification(new TransactionDelete(path), Optional.absent());
}
@Override
void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- appendModification(new TransactionMerge(path, data));
+ appendModification(new TransactionMerge(path, data), Optional.absent());
}
@Override
void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- appendModification(new TransactionWrite(path, data));
+ appendModification(new TransactionWrite(path, data), Optional.absent());
}
private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
}
}
+ private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
+ if (builderBusy) {
+ flushBuilder(enqueuedTicks);
+ }
+ }
+
private void flushBuilder() {
+ flushBuilder(Optional.absent());
+ }
+
+ private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
- sendModification(request);
+ sendModification(request, enqueuedTicks);
}
- private void sendModification(final TransactionRequest<?> request) {
- sendRequest(request, response -> completeModify(request, response));
- }
-
- @Override
- void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) {
- if (request instanceof CommitLocalTransactionRequest) {
- replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback);
- } else if (request instanceof AbortLocalTransactionRequest) {
- sendRequest(abortRequest(), callback);
+ private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
+ if (enqueuedTicks.isPresent()) {
+ enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
} else {
- throw new IllegalStateException("Unhandled request " + request);
+ sendRequest(request, response -> completeModify(request, response));
}
}
- private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
- final Consumer<Response<?, ?>> callback) {
- final DataTreeModification mod = request.getModification();
- mod.applyToCursor(new AbstractDataTreeModificationCursor() {
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- doWrite(current().node(child), data);
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- doMerge(current().node(child), data);
- }
-
- @Override
- public void delete(final PathArgument child) {
- doDelete(current().node(child));
- }
- });
-
- sendRequest(commitRequest(request.isCoordinated()), callback);
- }
-
- @Override
- void handleForwardedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback) {
- nextSequence();
-
- if (callback == null) {
- sendModification(request);
- return;
- }
-
- /*
- * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null
- * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below.
- */
- final Consumer<Response<?, ?>> findBugsIsStupid = callback;
-
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
- sendRequest(request, response -> {
- findBugsIsStupid.accept(Preconditions.checkNotNull(response));
- completeModify(request, response);
- });
+ private void appendModification(final TransactionModification modification) {
+ appendModification(modification, Optional.absent());
}
- private void appendModification(final TransactionModification modification) {
+ private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
if (operationFailure == null) {
ensureInitializedBuilder();
builder.addModification(modification);
if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
- flushBuilder();
+ flushBuilder(enqueuedTicks);
}
} else {
LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
if (builderBusy) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
- successor.handleForwardedRemoteRequest(request, null);
+ forwardToSuccessor(successor, request, null);
}
}
if (maybeProto.isPresent()) {
ensureSealed();
+ final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
case ABORT:
- sendRequest(abortRequest(), callback);
+ tmp = abortRequest();
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
break;
case SIMPLE:
- sendRequest(commitRequest(false), callback);
+ tmp = commitRequest(false);
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
break;
case THREE_PHASE:
- sendRequest(commitRequest(true), callback);
+ tmp = commitRequest(true);
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
break;
case READY:
//no op
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+ ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ recordFinishedRequest();
+ callback.accept(resp);
+ });
} else if (request instanceof ExistsTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+ ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ recordFinishedRequest();
+ callback.accept(resp);
+ });
} else if (request instanceof TransactionPreCommitRequest) {
ensureFlushedBuider();
- sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+ localActor());
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
} else if (request instanceof TransactionDoCommitRequest) {
ensureFlushedBuider();
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
ensureFlushedBuider();
sendAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
- purge();
+ sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
final Consumer<Response<?, ?>> callback) {
successor.handleForwardedRemoteRequest(request, callback);
}
+
+ @Override
+ void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ if (request instanceof CommitLocalTransactionRequest) {
+ replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ enqueueRequest(abortRequest(), callback, enqueuedTicks);
+ } else {
+ throw new IllegalStateException("Unhandled request " + request);
+ }
+ }
+
+ private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final DataTreeModification mod = request.getModification();
+ final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+ mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ appendModification(new TransactionWrite(current().node(child), data), optTicks);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ appendModification(new TransactionMerge(current().node(child), data), optTicks);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ appendModification(new TransactionDelete(current().node(child)), optTicks);
+ }
+ });
+
+ enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
+ }
+
+ @Override
+ void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+ final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+
+ if (request instanceof ModifyTransactionRequest) {
+ final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+ for (TransactionModification mod : req.getModifications()) {
+ appendModification(mod, optTicks);
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ ensureSealed();
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case READY:
+ //no op
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ } else if (request instanceof ReadTransactionRequest) {
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ recordFinishedRequest();
+ cb.accept(resp);
+ }, enqueuedTicks);
+ } else if (request instanceof ExistsTransactionRequest) {
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ recordFinishedRequest();
+ cb.accept(resp);
+ }, enqueuedTicks);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ ensureFlushedBuider(optTicks);
+ final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+ localActor());
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ ensureFlushedBuider(optTicks);
+ enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
+ enqueuedTicks);
+ } else if (request instanceof TransactionAbortRequest) {
+ ensureFlushedBuider(optTicks);
+ enqueueAbort(callback, enqueuedTicks);
+ } else if (request instanceof TransactionPurgeRequest) {
+ enqueuePurge(enqueuedTicks);
+ } else {
+ throw new IllegalArgumentException("Unhandled request {}" + request);
+ }
+ }
}
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
import com.google.common.primitives.UnsignedLong;
import java.util.ArrayList;
import java.util.List;
final List<ConnectionEntry> entries = new ArrayList<>();
final Consumer<Response<?, ?>> callback = createCallbackMock();
final ReadTransactionRequest request1 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
final ExistsTransactionRequest request2 =
- new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+ new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
transaction.recordSuccessfulRequest(successful1);
final ReadTransactionRequest successful2 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
transaction.recordSuccessfulRequest(successful2);
transaction.startReconnect();
transaction.replayMessages(successor.getTransaction(), entries);
Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
- Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+
+ ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(request1.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
+ Assert.assertNotNull(tmpExist);
+ Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
+ Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
+ Assert.assertEquals(request2.getPath(), tmpExist.getPath());
+ Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
}
protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
}
protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
- transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+ transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
final T received = (T) envelope.getMessage();
Assert.assertTrue(received.getClass().equals(request.getClass()));
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Test;
final ReadTransactionRequest request =
new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
final Consumer<Response<?, ?>> callback = createCallbackMock();
- transaction.handleForwardedRemoteRequest(request, callback);
+ transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(callback).accept(captor.capture());
final Response value = captor.getValue();
final ExistsTransactionRequest request =
new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
final Consumer<Response<?, ?>> callback = createCallbackMock();
- transaction.handleForwardedRemoteRequest(request, callback);
+ transaction.handleReplayedRemoteRequest(request, callback, Ticker.systemTicker().read());
final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(callback).accept(captor.capture());
final Response value = captor.getValue();
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
import com.google.common.base.VerifyException;
import org.junit.Assert;
import org.junit.Test;
builder.setSequence(0);
builder.setAbort();
final ModifyTransactionRequest request = builder.build();
- transaction.applyModifyTransactionRequest(request, createCallbackMock());
+ transaction.replayModifyTransactionRequest(request, createCallbackMock(), Ticker.systemTicker().read());
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}
builder.setSequence(0);
builder.setReady();
final ModifyTransactionRequest request = builder.build();
- assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()),
- VerifyException.class);
+ assertOperationThrowsException(() -> transaction.replayModifyTransactionRequest(request, createCallbackMock(),
+ Ticker.systemTicker().read()), VerifyException.class);
}
-
}
\ No newline at end of file
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.function.Consumer;
import org.junit.Assert;
builder.setAbort();
final ModifyTransactionRequest request = builder.build();
final Consumer<Response<?, ?>> callback = createCallbackMock();
- transaction.applyModifyTransactionRequest(request, callback);
+ transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}
builder.setCommit(coordinated);
final ModifyTransactionRequest request = builder.build();
final Consumer<Response<?, ?>> callback = createCallbackMock();
- transaction.applyModifyTransactionRequest(request, callback);
+ transaction.replayModifyTransactionRequest(request, callback, Ticker.systemTicker().read());
verify(modification).write(PATH_1, DATA_1);
verify(modification).merge(PATH_2, DATA_2);
verify(modification).delete(PATH_3);
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import akka.actor.ActorRef;
import akka.testkit.TestProbe;
import javax.annotation.Nonnull;
import org.junit.Assert;
this.backendProbe = backendProbe;
}
+ ActorRef localActor() {
+ return connection.localActor();
+ }
+
T getTransaction() {
return transaction;
}