BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalProxyTransaction.java
index 576fa67ed467afc05fe871c1db424aa2a45eca17..7b652f474b80aa8a0fbdd8f6e48214d5d7487370 100644 (file)
@@ -9,18 +9,32 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,18 +57,16 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 final class LocalProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
-    private static final Consumer<Response<?, ?>> ABORT_COMPLETER = response -> {
-        LOG.debug("Abort completed with {}", response);
-    };
 
     private final TransactionIdentifier identifier;
-    private DataTreeModification modification;
 
-    LocalProxyTransaction(final DistributedDataStoreClientBehavior client,
-        final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) {
-        super(client);
+    private CursorAwareDataTreeModification modification;
+
+    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
+        final CursorAwareDataTreeModification modification) {
+        super(parent);
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.modification = snapshot.newModification();
+        this.modification = Preconditions.checkNotNull(modification);
     }
 
     @Override
@@ -87,17 +99,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
         return Futures.immediateCheckedFuture(modification.readNode(path));
     }
 
+    private RuntimeException abortedException() {
+        return new IllegalStateException("Tracker " + identifier + " has been aborted");
+    }
+
+    private RuntimeException submittedException() {
+        return new IllegalStateException("Tracker " + identifier + " has been submitted");
+    }
+
     @Override
     void doAbort() {
-        sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER);
-        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
+        sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
+            LOG.debug("Transaction {} abort completed with {}", identifier, response);
+        });
     }
 
     @Override
-    CommitLocalTransactionRequest doCommit(final boolean coordinated) {
+    CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
             modification, coordinated);
-        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
+        modification = new FailedDataTreeModification(this::submittedException);
         return ret;
     }
 
@@ -105,4 +126,121 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     void doSeal() {
         modification.ready();
     }
+
+    DataTreeSnapshot getSnapshot() {
+        return modification;
+    }
+
+    private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        for (TransactionModification mod : request.getModifications()) {
+            if (mod instanceof TransactionWrite) {
+                modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+            } else if (mod instanceof TransactionMerge) {
+                modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+            } else if (mod instanceof TransactionDelete) {
+                modification.delete(mod.getPath());
+            } else {
+                throw new IllegalArgumentException("Unsupported modification " + mod);
+            }
+        }
+
+        final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+        if (maybeProtocol.isPresent()) {
+            seal();
+            Verify.verify(callback != null, "Request {} has null callback", request);
+
+            switch (maybeProtocol.get()) {
+                case ABORT:
+                    sendAbort(callback);
+                    break;
+                case SIMPLE:
+                    sendRequest(commitRequest(false), callback);
+                    break;
+                case THREE_PHASE:
+                    sendRequest(commitRequest(true), callback);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+            }
+        }
+    }
+
+    @Override
+    void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+            final @Nullable Consumer<Response<?, ?>> callback) {
+        LOG.debug("Applying forwaded request {}", request);
+
+        if (request instanceof ModifyTransactionRequest) {
+            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else {
+            throw new IllegalArgumentException("Unhandled request " + request);
+        }
+    }
+
+    @Override
+    void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        if (request instanceof CommitLocalTransactionRequest) {
+            final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+            final DataTreeModification mod = req.getModification();
+
+            LOG.debug("Applying modification {} to successor {}", mod, successor);
+            mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+                @Override
+                public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.write(current().node(child), data);
+                }
+
+                @Override
+                public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                    successor.merge(current().node(child), data);
+                }
+
+                @Override
+                public void delete(final PathArgument child) {
+                    successor.delete(current().node(child));
+                }
+            });
+
+            successor.seal();
+
+            final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+            successor.sendRequest(successorReq, callback);
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            LOG.debug("Forwarding abort {} to successor {}", request, successor);
+            successor.abort();
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+    }
+
+    @Override
+    void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) throws RequestException {
+        if (request instanceof AbortLocalTransactionRequest) {
+            successor.sendAbort(request, callback);
+        } else if (request instanceof CommitLocalTransactionRequest) {
+            successor.sendCommit((CommitLocalTransactionRequest)request, callback);
+        } else {
+            throw new IllegalArgumentException("Unhandled request" + request);
+        }
+
+        LOG.debug("Forwarded request {} to successor {}", request, successor);
+    }
+
+    private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        sendRequest(request, callback);
+        modification = new FailedDataTreeModification(this::abortedException);
+    }
+
+    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+        // Rebase old modification on new data tree.
+        try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+            request.getModification().applyToCursor(cursor);
+        }
+
+        seal();
+        sendRequest(commitRequest(request.isCoordinated()), callback);
+    }
 }