Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Iterables;
16 import com.google.common.util.concurrent.FluentFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.SortedSet;
24 import java.util.TreeMap;
25 import java.util.TreeSet;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
28 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
29 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
30 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
35 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
36 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
39 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.concurrent.Future;
44 import scala.concurrent.Promise;
45
46 /**
47  * A transaction potentially spanning multiple backend shards.
48  */
49 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier>
50         implements DOMStoreReadWriteTransaction {
51     private enum TransactionState {
52         OPEN,
53         READY,
54         CLOSED,
55     }
56
57     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
58     private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.of());
59
60     private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
61     private final AbstractTransactionContextFactory<?> txContextFactory;
62     private final TransactionType type;
63     private TransactionState state = TransactionState.OPEN;
64
65     @VisibleForTesting
66     public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
67         super(txContextFactory.nextIdentifier(), txContextFactory.getActorUtils().getDatastoreContext()
68                 .isTransactionDebugContextEnabled());
69         this.txContextFactory = txContextFactory;
70         this.type = requireNonNull(type);
71
72         LOG.debug("New {} Tx - {}", type, getIdentifier());
73     }
74
75     @Override
76     public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
77         return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
78     }
79
80     private <T> FluentFuture<T> executeRead(final String shardName, final AbstractRead<T> readCmd) {
81         checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
82
83         LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
84
85         final SettableFuture<T> proxyFuture = SettableFuture.create();
86         AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
87         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
88             @Override
89             public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
90                 transactionContext.executeRead(readCmd, proxyFuture, havePermit);
91             }
92         });
93
94         return FluentFuture.from(proxyFuture);
95     }
96
97     @Override
98     public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
99         checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
100         requireNonNull(path, "path should not be null");
101
102         LOG.trace("Tx {} read {}", getIdentifier(), path);
103         return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
104     }
105
106     private FluentFuture<Optional<NormalizedNode>> singleShardRead(final String shardName,
107             final YangInstanceIdentifier path) {
108         return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
109     }
110
111     private FluentFuture<Optional<NormalizedNode>> readAllData() {
112         final var actorUtils = getActorUtils();
113         return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
114             .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.of())));
115     }
116
117     @Override
118     public void delete(final YangInstanceIdentifier path) {
119         checkModificationState("delete", path);
120
121         if (path.isEmpty()) {
122             deleteAllData();
123         } else {
124             executeModification(new DeleteOperation(path));
125         }
126     }
127
128     private void deleteAllData() {
129         for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
130             wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
131         }
132     }
133
134     @Override
135     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
136         checkModificationState("merge", path);
137
138         if (path.isEmpty()) {
139             mergeAllData(RootScatterGather.castRootNode(data));
140         } else {
141             executeModification(new MergeOperation(path, data));
142         }
143     }
144
145     private void mergeAllData(final ContainerNode rootData) {
146         if (!rootData.isEmpty()) {
147             RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
148                 scattered -> scattered.shard().maybeExecuteTransactionOperation(
149                     new MergeOperation(YangInstanceIdentifier.of(), scattered.container())));
150         }
151     }
152
153     @Override
154     public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
155         checkModificationState("write", path);
156
157         if (path.isEmpty()) {
158             writeAllData(RootScatterGather.castRootNode(data));
159         } else {
160             executeModification(new WriteOperation(path, data));
161         }
162     }
163
164     private void writeAllData(final ContainerNode rootData) {
165         RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
166             getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
167                 scattered -> scattered.shard().maybeExecuteTransactionOperation(
168                     new WriteOperation(YangInstanceIdentifier.of(), scattered.container())));
169     }
170
171     private void executeModification(final TransactionModificationOperation operation) {
172         wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
173     }
174
175     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
176         checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
177         checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
178         LOG.trace("Tx {} {} {}", getIdentifier(), opName, path);
179     }
180
181     private boolean seal(final TransactionState newState) {
182         if (state == TransactionState.OPEN) {
183             state = newState;
184             return true;
185         }
186         return false;
187     }
188
189     @Override
190     public final void close() {
191         if (!seal(TransactionState.CLOSED)) {
192             checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
193                 getIdentifier());
194             // Idempotent no-op as per AutoCloseable recommendation
195             return;
196         }
197
198         for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
199             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
200                 @Override
201                 public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
202                     transactionContext.closeTransaction();
203                 }
204             });
205         }
206
207
208         txContextWrappers.clear();
209     }
210
211     @Override
212     public final AbstractThreePhaseCommitCohort<?> ready() {
213         checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
214
215         final boolean success = seal(TransactionState.READY);
216         checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
217
218         LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size());
219
220         final AbstractThreePhaseCommitCohort<?> ret = switch (txContextWrappers.size()) {
221             case 0 -> NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
222             case 1 -> {
223                 final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
224                         txContextWrappers.entrySet());
225                 yield createSingleCommitCohort(e.getKey(), e.getValue());
226             }
227             default -> createMultiCommitCohort();
228         };
229         txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
230
231         final Throwable debugContext = getDebugContext();
232         return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
233     }
234
235     @SuppressWarnings({ "rawtypes", "unchecked" })
236     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
237             final AbstractTransactionContextWrapper contextWrapper) {
238
239         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
240
241         final OperationCallback.Reference operationCallbackRef =
242                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
243
244         final TransactionContext transactionContext = contextWrapper.getTransactionContext();
245         final Future future;
246         if (transactionContext == null) {
247             final Promise promise = akka.dispatch.Futures.promise();
248             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
249                 @Override
250                 public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
251                     promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
252                         havePermit));
253                 }
254             });
255             future = promise.future();
256         } else {
257             // avoid the creation of a promise and a TransactionOperation
258             future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
259         }
260
261         return new SingleCommitCohortProxy(txContextFactory.getActorUtils(), future, getIdentifier(),
262             operationCallbackRef);
263     }
264
265     private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
266             final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
267         TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
268                 txContextFactory.getActorUtils());
269         operationCallbackRef.set(rateLimitingCallback);
270         rateLimitingCallback.run();
271         return transactionContext.directCommit(havePermit);
272     }
273
274     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
275
276         final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
277         final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
278         for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
279             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
280
281             final AbstractTransactionContextWrapper wrapper = e.getValue();
282
283             // The remote tx version is obtained the via TransactionContext which may not be available yet so
284             // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
285             // TransactionContext is available.
286             cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
287                 () -> wrapper.getTransactionContext().getTransactionVersion()));
288         }
289
290         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
291     }
292
293     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
294         return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
295     }
296
297     private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
298         return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.of(childId)));
299     }
300
301     private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
302         final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
303         if (existing != null) {
304             return existing;
305         }
306
307         final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
308         txContextWrappers.put(shardName, fresh);
309         return fresh;
310     }
311
312     TransactionType getType() {
313         return type;
314     }
315
316     boolean isReady() {
317         return state != TransactionState.OPEN;
318     }
319
320     final ActorUtils getActorUtils() {
321         return txContextFactory.getActorUtils();
322     }
323 }