import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@NotThreadSafe
final class LocalProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
- private static final Consumer<Response<?, ?>> ABORT_COMPLETER = response -> {
- LOG.debug("Abort completed with {}", response);
- };
private final TransactionIdentifier identifier;
- private DataTreeModification modification;
- LocalProxyTransaction(final DistributedDataStoreClientBehavior client,
- final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) {
- super(client);
+ private CursorAwareDataTreeModification modification;
+
+ LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+ final CursorAwareDataTreeModification modification) {
+ super(parent);
this.identifier = Preconditions.checkNotNull(identifier);
- this.modification = snapshot.newModification();
+ this.modification = Preconditions.checkNotNull(modification);
}
@Override
return Futures.immediateCheckedFuture(modification.readNode(path));
}
+ private RuntimeException abortedException() {
+ return new IllegalStateException("Tracker " + identifier + " has been aborted");
+ }
+
+ private RuntimeException submittedException() {
+ return new IllegalStateException("Tracker " + identifier + " has been submitted");
+ }
+
@Override
void doAbort() {
- sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER);
- modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
+ sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
+ LOG.debug("Transaction {} abort completed with {}", identifier, response);
+ });
}
@Override
- CommitLocalTransactionRequest doCommit(final boolean coordinated) {
+ CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
modification, coordinated);
- modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
+ modification = new FailedDataTreeModification(this::submittedException);
return ret;
}
void doSeal() {
modification.ready();
}
+
+ DataTreeSnapshot getSnapshot() {
+ return modification;
+ }
+
+ private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ for (TransactionModification mod : request.getModifications()) {
+ if (mod instanceof TransactionWrite) {
+ modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+ } else if (mod instanceof TransactionMerge) {
+ modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+ } else if (mod instanceof TransactionDelete) {
+ modification.delete(mod.getPath());
+ } else {
+ throw new IllegalArgumentException("Unsupported modification " + mod);
+ }
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+ if (maybeProtocol.isPresent()) {
+ seal();
+ Verify.verify(callback != null, "Request {} has null callback", request);
+
+ switch (maybeProtocol.get()) {
+ case ABORT:
+ sendAbort(callback);
+ break;
+ case SIMPLE:
+ sendRequest(commitRequest(false), callback);
+ break;
+ case THREE_PHASE:
+ sendRequest(commitRequest(true), callback);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+ }
+ }
+ }
+
+ @Override
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ LOG.debug("Applying forwaded request {}", request);
+
+ if (request instanceof ModifyTransactionRequest) {
+ applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ if (request instanceof CommitLocalTransactionRequest) {
+ final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+ final DataTreeModification mod = req.getModification();
+
+ LOG.debug("Applying modification {} to successor {}", mod, successor);
+ mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.write(current().node(child), data);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.merge(current().node(child), data);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ successor.delete(current().node(child));
+ }
+ });
+
+ successor.seal();
+
+ final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+ successor.sendRequest(successorReq, callback);
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ LOG.debug("Forwarding abort {} to successor {}", request, successor);
+ successor.abort();
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+ }
+
+ @Override
+ void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ if (request instanceof AbortLocalTransactionRequest) {
+ successor.sendAbort(request, callback);
+ } else if (request instanceof CommitLocalTransactionRequest) {
+ successor.sendCommit((CommitLocalTransactionRequest)request, callback);
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+
+ LOG.debug("Forwarded request {} to successor {}", request, successor);
+ }
+
+ private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ sendRequest(request, callback);
+ modification = new FailedDataTreeModification(this::abortedException);
+ }
+
+ private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+ // Rebase old modification on new data tree.
+ try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+ request.getModification().applyToCursor(cursor);
+ }
+
+ seal();
+ sendRequest(commitRequest(request.isCoordinated()), callback);
+ }
}