* An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
* not known or is known to be not co-located with the client.
*
+ * <p>
* It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
* maintaining any submitted operations until the leader is discovered.
*
+ * <p>
* This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
* transitions based on backend responses are thread-safe.
*
RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
final TransactionIdentifier identifier) {
super(client);
- builder = new ModifyTransactionRequestBuilder(identifier, client.self());
+ builder = new ModifyTransactionRequestBuilder(identifier, localActor());
}
@Override
// Make sure we send any modifications before issuing a read
ensureFlushedBuider();
- client().sendRequest(request, completer);
+ sendRequest(request, completer);
return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
}
@Override
CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
final SettableFuture<Boolean> future = SettableFuture.create();
- return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+ return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
t -> completeExists(future, t), future);
}
@Override
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
- return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+ return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
t -> completeRead(future, t), future);
}
}
private void flushBuilder() {
- client().sendRequest(builder.build(), this::completeModify);
+ final ModifyTransactionRequest message = builder.build();
builderBusy = false;
+
+ sendRequest(message, this::completeModify);
}
private void appendModification(final TransactionModification modification) {
}
}
- private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
+ private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
+ final Response<?, ?> response) {
LOG.debug("Read request completed with {}", response);
if (response instanceof ReadTransactionSuccess) {