X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=946e3341fd8f7778fdbf940299deb72112a61284;hb=HEAD;hp=284be4a45793699576e412c88d559d15bc916bba;hpb=2b702880c19e11be077ddcc540aeacd80ecfcaf6;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
index 284be4a457..946e3341fd 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
@@ -7,12 +7,14 @@
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import static com.google.common.base.Verify.verify;
+
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.function.Consumer;
-import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
@@ -47,7 +49,7 @@ import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,18 +64,14 @@ import org.slf4j.LoggerFactory;
*
* This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
* transitions based on backend responses are thread-safe.
- *
- * @author Robert Varga
*/
final class RemoteProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
- // FIXME: make this tuneable
- private static final int REQUEST_MAX_MODIFICATIONS = 1000;
-
private final ModifyTransactionRequestBuilder builder;
private final boolean sendReadyOnSeal;
private final boolean snapshotOnly;
+ private final int maxModifications;
private boolean builderBusy;
@@ -85,6 +83,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
+ maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount();
}
@Override
@@ -99,17 +98,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
@Override
void doDelete(final YangInstanceIdentifier path) {
- appendModification(new TransactionDelete(path), Optional.empty());
+ appendModification(new TransactionDelete(path), OptionalLong.empty());
}
@Override
- void doMerge(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
- appendModification(new TransactionMerge(path, data), Optional.empty());
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) {
+ appendModification(new TransactionMerge(path, data), OptionalLong.empty());
}
@Override
- void doWrite(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
- appendModification(new TransactionWrite(path, data), Optional.empty());
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) {
+ appendModification(new TransactionWrite(path, data), OptionalLong.empty());
}
private FluentFuture sendReadRequest(final AbstractReadTransactionRequest> request,
@@ -135,8 +134,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
@Override
- FluentFuture>> doRead(final YangInstanceIdentifier path) {
- final SettableFuture>> future = SettableFuture.create();
+ FluentFuture> doRead(final YangInstanceIdentifier path) {
+ final SettableFuture> future = SettableFuture.create();
return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
isSnapshotOnly()), t -> completeRead(path, future, t), future);
}
@@ -149,40 +148,40 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
private void ensureFlushedBuider() {
- ensureFlushedBuider(Optional.empty());
+ ensureFlushedBuider(OptionalLong.empty());
}
- private void ensureFlushedBuider(final Optional enqueuedTicks) {
+ private void ensureFlushedBuider(final OptionalLong enqueuedTicks) {
if (builderBusy) {
flushBuilder(enqueuedTicks);
}
}
- private void flushBuilder(final Optional enqueuedTicks) {
+ private void flushBuilder(final OptionalLong enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
sendModification(request, enqueuedTicks);
}
- private void sendModification(final TransactionRequest> request, final Optional enqueuedTicks) {
+ private void sendModification(final TransactionRequest> request, final OptionalLong enqueuedTicks) {
if (enqueuedTicks.isPresent()) {
- enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
+ enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.orElseThrow());
} else {
sendRequest(request, response -> completeModify(request, response));
}
}
private void appendModification(final TransactionModification modification) {
- appendModification(modification, Optional.empty());
+ appendModification(modification, OptionalLong.empty());
}
- private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) {
+ private void appendModification(final TransactionModification modification, final OptionalLong enqueuedTicks) {
if (operationFailure == null) {
ensureInitializedBuilder();
builder.addModification(modification);
- if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+ if (builder.size() >= maxModifications) {
flushBuilder(enqueuedTicks);
}
} else {
@@ -203,8 +202,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
private Exception recordFailedResponse(final Response, ?> response) {
final Exception failure;
- if (response instanceof RequestFailure) {
- final RequestException cause = ((RequestFailure, ?>) response).getCause();
+ if (response instanceof RequestFailure, ?> requestFailure) {
+ final RequestException cause = requestFailure.getCause();
failure = cause instanceof RequestTimeoutException
? new DataStoreUnavailableException(cause.getMessage(), cause) : cause;
} else {
@@ -228,8 +227,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
final Response, ?> response) {
LOG.debug("Exists request for {} completed with {}", path, response);
- if (response instanceof ExistsTransactionSuccess) {
- future.set(((ExistsTransactionSuccess) response).getExists());
+ if (response instanceof ExistsTransactionSuccess success) {
+ future.set(success.getExists());
} else {
failReadFuture(future, "Error executing exists request for path " + path, response);
}
@@ -237,12 +236,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
recordFinishedRequest(response);
}
- private void completeRead(final YangInstanceIdentifier path,
- final SettableFuture>> future, final Response, ?> response) {
+ private void completeRead(final YangInstanceIdentifier path, final SettableFuture> future,
+ final Response, ?> response) {
LOG.debug("Read request for {} completed with {}", path, response);
- if (response instanceof ReadTransactionSuccess) {
- future.set(((ReadTransactionSuccess) response).getData());
+ if (response instanceof ReadTransactionSuccess success) {
+ future.set(success.getData());
} else {
failReadFuture(future, "Error reading data for path " + path, response);
}
@@ -274,7 +273,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
@Override
- boolean sealAndSend(final Optional enqueuedTicks) {
+ boolean sealAndSend(final OptionalLong enqueuedTicks) {
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
@@ -284,14 +283,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
@Override
- java.util.Optional flushState() {
+ Optional flushState() {
if (!builderBusy) {
- return java.util.Optional.empty();
+ return Optional.empty();
}
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
- return java.util.Optional.of(request);
+ return Optional.of(request);
}
@Override
@@ -301,19 +300,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
void handleForwardedRequest(final TransactionRequest> request, final Consumer> callback) {
- if (request instanceof ModifyTransactionRequest) {
- handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ModifyTransactionRequest modifyRequest) {
+ handleForwardedModifyTransactionRequest(callback, modifyRequest);
+ } else if (request instanceof ReadTransactionRequest readRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ readRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
callback.accept(resp);
});
- } else if (request instanceof ExistsTransactionRequest) {
+ } else if (request instanceof ExistsTransactionRequest existsRequest) {
ensureFlushedBuider();
sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ existsRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
callback.accept(resp);
});
@@ -334,7 +333,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(callback);
} else {
- throw new IllegalArgumentException("Unhandled request {}" + request);
+ throw unhandledRequest(request);
}
}
@@ -342,16 +341,18 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
final ModifyTransactionRequest req) {
req.getModifications().forEach(this::appendModification);
- final java.util.Optional maybeProto = req.getPersistenceProtocol();
+ final Optional maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
// Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
// until we know what we are going to do.
if (markSealed()) {
- sealOnly();
+ if (!sealOnly()) {
+ LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this);
+ }
}
final TransactionRequest> tmp;
- switch (maybeProto.get()) {
+ switch (maybeProto.orElseThrow()) {
case ABORT:
tmp = abortRequest();
sendRequest(tmp, resp -> {
@@ -381,7 +382,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
});
break;
default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
}
}
}
@@ -395,28 +396,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
@Override
void handleReplayedLocalRequest(final AbstractLocalTransactionRequest> request,
final Consumer> callback, final long enqueuedTicks) {
- if (request instanceof CommitLocalTransactionRequest) {
- replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
+ if (request instanceof CommitLocalTransactionRequest commitRequest) {
+ replayLocalCommitRequest(commitRequest, callback, enqueuedTicks);
} else if (request instanceof AbortLocalTransactionRequest) {
enqueueRequest(abortRequest(), callback, enqueuedTicks);
} else {
- throw new IllegalStateException("Unhandled request " + request);
+ throw unhandledRequest(request);
}
}
private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
final Consumer> callback, final long enqueuedTicks) {
final DataTreeModification mod = request.getModification();
- final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+ final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
mod.applyToCursor(new AbstractDataTreeModificationCursor() {
@Override
- public void write(final PathArgument child, final NormalizedNode, ?> data) {
+ public void write(final PathArgument child, final NormalizedNode data) {
appendModification(new TransactionWrite(current().node(child), data), optTicks);
}
@Override
- public void merge(final PathArgument child, final NormalizedNode, ?> data) {
+ public void merge(final PathArgument child, final NormalizedNode data) {
appendModification(new TransactionMerge(current().node(child), data), optTicks);
}
@@ -430,24 +431,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
@Override
- void handleReplayedRemoteRequest(final TransactionRequest> request,
- @Nullable final Consumer> callback, final long enqueuedTicks) {
+ void handleReplayedRemoteRequest(final TransactionRequest> request, final Consumer> callback,
+ final long enqueuedTicks) {
final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ };
- final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks));
+ final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
- if (request instanceof ModifyTransactionRequest) {
- handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ModifyTransactionRequest modifyRequest) {
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, modifyRequest);
+ } else if (request instanceof ReadTransactionRequest readRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ readRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
- } else if (request instanceof ExistsTransactionRequest) {
+ } else if (request instanceof ExistsTransactionRequest existsRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
- ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
+ existsRequest.getPath(), isSnapshotOnly()), resp -> {
recordFinishedRequest(resp);
cb.accept(resp);
}, enqueuedTicks);
@@ -468,14 +469,13 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
enqueuePurge(callback, enqueuedTicks);
- } else if (request instanceof IncrementTransactionSequenceRequest) {
- final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
+ } else if (request instanceof IncrementTransactionSequenceRequest req) {
ensureFlushedBuider(optTicks);
enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
incrementSequence(req.getIncrement());
} else {
- throw new IllegalArgumentException("Unhandled request {}" + request);
+ throw unhandledRequest(request);
}
}
@@ -483,16 +483,16 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
final ModifyTransactionRequest req) {
req.getModifications().forEach(this::appendModification);
- final java.util.Optional maybeProto = req.getPersistenceProtocol();
+ final Optional maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
// Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
// until we know what we are going to do.
if (markSealed()) {
- sealOnly();
+ verify(sealOnly(), "Attempted to replay seal on %s", this);
}
final TransactionRequest> tmp;
- switch (maybeProto.get()) {
+ switch (maybeProto.orElseThrow()) {
case ABORT:
tmp = abortRequest();
enqueueRequest(tmp, resp -> {
@@ -522,7 +522,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
}, enqueuedTicks);
break;
default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
}
}
}