import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* <p>
* This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
* transitions based on backend responses are thread-safe.
- *
- * @author Robert Varga
*/
final class RemoteProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
- // FIXME: make this tuneable
- private static final int REQUEST_MAX_MODIFICATIONS = 1000;
-
private final ModifyTransactionRequestBuilder builder;
private final boolean sendReadyOnSeal;
private final boolean snapshotOnly;
+ private final int maxModifications;
private boolean builderBusy;
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
+ maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount();
}
@Override
}
@Override
- void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) {
appendModification(new TransactionMerge(path, data), OptionalLong.empty());
}
@Override
- void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) {
appendModification(new TransactionWrite(path, data), OptionalLong.empty());
}
}
@Override
- FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(final YangInstanceIdentifier path) {
- final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
+ FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+ final SettableFuture<Optional<NormalizedNode>> future = SettableFuture.create();
return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
isSnapshotOnly()), t -> completeRead(path, future, t), future);
}
private void sendModification(final TransactionRequest<?> request, final OptionalLong enqueuedTicks) {
if (enqueuedTicks.isPresent()) {
- enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.getAsLong());
+ enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.orElseThrow());
} else {
sendRequest(request, response -> completeModify(request, response));
}
ensureInitializedBuilder();
builder.addModification(modification);
- if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+ if (builder.size() >= maxModifications) {
flushBuilder(enqueuedTicks);
}
} else {
private Exception recordFailedResponse(final Response<?, ?> response) {
final Exception failure;
- if (response instanceof RequestFailure) {
- final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
+ if (response instanceof RequestFailure<?, ?> requestFailure) {
+ final RequestException cause = requestFailure.getCause();
failure = cause instanceof RequestTimeoutException
? new DataStoreUnavailableException(cause.getMessage(), cause) : cause;
} else {
final Response<?, ?> response) {
LOG.debug("Exists request for {} completed with {}", path, response);
- if (response instanceof ExistsTransactionSuccess) {
- future.set(((ExistsTransactionSuccess) response).getExists());
+ if (response instanceof ExistsTransactionSuccess success) {
+ future.set(success.getExists());
} else {
failReadFuture(future, "Error executing exists request for path " + path, response);
}
recordFinishedRequest(response);
}
- private void completeRead(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
+ private void completeRead(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode>> future,
+ final Response<?, ?> response) {
LOG.debug("Read request for {} completed with {}", path, response);
- if (response instanceof ReadTransactionSuccess) {
- future.set(((ReadTransactionSuccess) response).getData());
+ if (response instanceof ReadTransactionSuccess success) {
+ future.set(success.getData());
} else {
failReadFuture(future, "Error reading data for path " + path, response);
}
}
void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
- if (request instanceof ModifyTransactionRequest) {
- handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ModifyTransactionRequest modifyRequest) {
+ handleForwardedModifyTransactionRequest(callback, modifyRequest);
+ } else if (request instanceof ReadTransactionRequest readRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ readRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
callback.accept(resp);
});
- } else if (request instanceof ExistsTransactionRequest) {
+ } else if (request instanceof ExistsTransactionRequest existsRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ existsRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
callback.accept(resp);
});
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(callback);
} else {
- throw new IllegalArgumentException("Unhandled request {}" + request);
+ throw unhandledRequest(request);
}
}
}
final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
+ switch (maybeProto.orElseThrow()) {
case ABORT:
tmp = abortRequest();
sendRequest(tmp, resp -> {
});
break;
default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
}
}
}
@Override
void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- if (request instanceof CommitLocalTransactionRequest) {
- replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
+ if (request instanceof CommitLocalTransactionRequest commitRequest) {
+ replayLocalCommitRequest(commitRequest, callback, enqueuedTicks);
} else if (request instanceof AbortLocalTransactionRequest) {
enqueueRequest(abortRequest(), callback, enqueuedTicks);
} else {
- throw new IllegalStateException("Unhandled request " + request);
+ throw unhandledRequest(request);
}
}
mod.applyToCursor(new AbstractDataTreeModificationCursor() {
@Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ public void write(final PathArgument child, final NormalizedNode data) {
appendModification(new TransactionWrite(current().node(child), data), optTicks);
}
@Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ public void merge(final PathArgument child, final NormalizedNode data) {
appendModification(new TransactionMerge(current().node(child), data), optTicks);
}
final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
- if (request instanceof ModifyTransactionRequest) {
- handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ModifyTransactionRequest modifyRequest) {
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, modifyRequest);
+ } else if (request instanceof ReadTransactionRequest readRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ readRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
- } else if (request instanceof ExistsTransactionRequest) {
+ } else if (request instanceof ExistsTransactionRequest existsRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ existsRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(callback, enqueuedTicks);
- } else if (request instanceof IncrementTransactionSequenceRequest) {
- final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+ } else if (request instanceof IncrementTransactionSequenceRequest req) {
ensureFlushedBuider(optTicks);
enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
incrementSequence(req.getIncrement());
} else {
- throw new IllegalArgumentException("Unhandled request {}" + request);
+ throw unhandledRequest(request);
}
}
}
final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
+ switch (maybeProto.orElseThrow()) {
case ABORT:
tmp = abortRequest();
enqueueRequest(tmp, resp -> {
}, enqueuedTicks);
break;
default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
}
}
}