Fixup checkstyle
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ClientTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11
12 import com.google.common.annotations.Beta;
13 import com.google.common.util.concurrent.FluentFuture;
14 import java.util.Collection;
15 import java.util.Map;
16 import java.util.Optional;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
20 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
23 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
25
26 /**
27  * Client-side view of a transaction.
28  *
29  * <p>
30  * This interface is used by the world outside of the actor system and in the actor system it is manifested via
31  * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
32  * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
33  *
34  * <p>
35  * It is internally composed of multiple {@link RemoteProxyTransaction}s, each responsible for a component shard.
36  *
37  * <p>
38  * Implementation is quite a bit complex, and involves cooperation with {@link AbstractClientHistory} for tracking
39  * gaps in transaction identifiers seen by backends.
40  *
41  * <p>
42  * These gaps need to be accounted for in the transaction setup message sent to a particular backend, so it can verify
43  * that the requested transaction is in-sequence. This is critical in ensuring that transactions (which are independent
44  * entities from message queueing perspective) do not get reodered -- thus allowing multiple in-flight transactions.
45  *
46  * <p>
47  * Alternative would be to force visibility by sending an abort request to all potential backends, but that would mean
48  * that even empty transactions increase load on all shards -- which would be a scalability issue.
49  *
50  * <p>
51  * Yet another alternative would be to introduce inter-transaction dependencies to the queueing layer in client actor,
52  * but that would require additional indirection and complexity.
53  *
54  * @author Robert Varga
55  */
56 @Beta
57 public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
58     ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
59         super(parent, transactionId);
60     }
61
62     public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
63         return ensureProxy(path).exists(path);
64     }
65
66     public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
67         return path.isEmpty() ? readRoot() : ensureProxy(path).read(path);
68     }
69
70     private FluentFuture<Optional<NormalizedNode>> readRoot() {
71         return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies()
72             .map(proxy -> proxy.read(YangInstanceIdentifier.of())));
73     }
74
75     public void delete(final YangInstanceIdentifier path) {
76         if (path.isEmpty()) {
77             ensureAllProxies().forEach(proxy -> proxy.delete(YangInstanceIdentifier.of()));
78         } else {
79             ensureProxy(path).delete(path);
80         }
81     }
82
83     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
84         if (path.isEmpty()) {
85             mergeRoot(RootScatterGather.castRootNode(data));
86         } else {
87             ensureProxy(path).merge(path, data);
88         }
89     }
90
91     private void mergeRoot(final @NonNull ContainerNode rootData) {
92         if (!rootData.isEmpty()) {
93             RootScatterGather.scatterTouched(rootData, this::ensureProxy).forEach(
94                 scattered -> scattered.shard().merge(YangInstanceIdentifier.of(), scattered.container()));
95         }
96     }
97
98     public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
99         if (path.isEmpty()) {
100             writeRoot(RootScatterGather.castRootNode(data));
101         } else {
102             ensureProxy(path).write(path, data);
103         }
104     }
105
106     private void writeRoot(final @NonNull ContainerNode rootData) {
107         RootScatterGather.scatterAll(rootData, this::ensureProxy, ensureAllProxies()).forEach(
108             scattered -> scattered.shard().write(YangInstanceIdentifier.of(), scattered.container()));
109     }
110
111     private AbstractProxyTransaction ensureProxy(final PathArgument childId) {
112         return ensureProxy(YangInstanceIdentifier.of(childId));
113     }
114
115     public DOMStoreThreePhaseCommitCohort ready() {
116         final Map<Long, AbstractProxyTransaction> participants = ensureClosed();
117         checkState(participants != null, "Attempted to submit a closed transaction %s", this);
118
119         final Collection<AbstractProxyTransaction> toReady = participants.values();
120         toReady.forEach(AbstractProxyTransaction::seal);
121
122         final TransactionIdentifier txId = getIdentifier();
123         final AbstractClientHistory parent = parent();
124         parent.onTransactionShardsBound(txId, participants.keySet());
125
126         final AbstractTransactionCommitCohort cohort = switch (toReady.size()) {
127             case 0 -> new EmptyTransactionCommitCohort(parent, txId);
128             case 1 -> new DirectTransactionCommitCohort(parent, txId, toReady.iterator().next());
129             default -> new ClientTransactionCommitCohort(parent, txId, toReady);
130         };
131         return parent.onTransactionReady(this, cohort);
132     }
133
134     @Override
135     final AbstractProxyTransaction createProxy(final Long shard) {
136         return parent().createTransactionProxy(getIdentifier(), shard);
137     }
138 }