Implement scatter/gather on module shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Iterables;
16 import com.google.common.util.concurrent.FluentFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.SortedSet;
24 import java.util.TreeMap;
25 import java.util.TreeSet;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
28 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
29 import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
30 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
35 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
36 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
39 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.concurrent.Future;
44 import scala.concurrent.Promise;
45
46 /**
47  * A transaction potentially spanning multiple backend shards.
48  */
49 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier>
50         implements DOMStoreReadWriteTransaction {
51     private enum TransactionState {
52         OPEN,
53         READY,
54         CLOSED,
55     }
56
57     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
58     private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
59
60     private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
61     private final AbstractTransactionContextFactory<?> txContextFactory;
62     private final TransactionType type;
63     private TransactionState state = TransactionState.OPEN;
64
65     @VisibleForTesting
66     public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
67         super(txContextFactory.nextIdentifier(), txContextFactory.getActorUtils().getDatastoreContext()
68                 .isTransactionDebugContextEnabled());
69         this.txContextFactory = txContextFactory;
70         this.type = requireNonNull(type);
71
72         LOG.debug("New {} Tx - {}", type, getIdentifier());
73     }
74
75     @Override
76     public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
77         return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
78     }
79
80     private <T> FluentFuture<T> executeRead(final String shardName, final AbstractRead<T> readCmd) {
81         checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
82
83         LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
84
85         final SettableFuture<T> proxyFuture = SettableFuture.create();
86         AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
87         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
88             @Override
89             public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
90                 transactionContext.executeRead(readCmd, proxyFuture, havePermit);
91             }
92         });
93
94         return FluentFuture.from(proxyFuture);
95     }
96
97     @Override
98     public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
99         checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
100         requireNonNull(path, "path should not be null");
101
102         LOG.trace("Tx {} read {}", getIdentifier(), path);
103         return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
104     }
105
106     private FluentFuture<Optional<NormalizedNode>> singleShardRead(final String shardName,
107             final YangInstanceIdentifier path) {
108         return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
109     }
110
111     private FluentFuture<Optional<NormalizedNode>> readAllData() {
112         final var actorUtils = getActorUtils();
113         return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
114             .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty())));
115     }
116
117     @Override
118     public void delete(final YangInstanceIdentifier path) {
119         checkModificationState("delete", path);
120
121         if (path.isEmpty()) {
122             deleteAllData();
123         } else {
124             executeModification(new DeleteOperation(path));
125         }
126     }
127
128     private void deleteAllData() {
129         for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
130             wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
131         }
132     }
133
134     @Override
135     public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
136         checkModificationState("merge", path);
137
138         if (path.isEmpty()) {
139             mergeAllData(RootScatterGather.castRootNode(data));
140         } else {
141             executeModification(new MergeOperation(path, data));
142         }
143     }
144
145     private void mergeAllData(final ContainerNode rootData) {
146         if (!rootData.isEmpty()) {
147             RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
148                 scattered -> scattered.shard().maybeExecuteTransactionOperation(
149                     new MergeOperation(YangInstanceIdentifier.empty(), scattered.container())));
150         }
151     }
152
153     @Override
154     public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
155         checkModificationState("write", path);
156
157         if (path.isEmpty()) {
158             writeAllData(RootScatterGather.castRootNode(data));
159         } else {
160             executeModification(new WriteOperation(path, data));
161         }
162     }
163
164     private void writeAllData(final ContainerNode rootData) {
165         RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
166             getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
167                 scattered -> scattered.shard().maybeExecuteTransactionOperation(
168                     new WriteOperation(YangInstanceIdentifier.empty(), scattered.container())));
169     }
170
171     private void executeModification(final TransactionModificationOperation operation) {
172         wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
173     }
174
175     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
176         checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
177         checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
178         LOG.trace("Tx {} {} {}", getIdentifier(), opName, path);
179     }
180
181     private boolean seal(final TransactionState newState) {
182         if (state == TransactionState.OPEN) {
183             state = newState;
184             return true;
185         }
186         return false;
187     }
188
189     @Override
190     public final void close() {
191         if (!seal(TransactionState.CLOSED)) {
192             checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
193                 getIdentifier());
194             // Idempotent no-op as per AutoCloseable recommendation
195             return;
196         }
197
198         for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
199             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
200                 @Override
201                 public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
202                     transactionContext.closeTransaction();
203                 }
204             });
205         }
206
207
208         txContextWrappers.clear();
209     }
210
211     @Override
212     public final AbstractThreePhaseCommitCohort<?> ready() {
213         checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
214
215         final boolean success = seal(TransactionState.READY);
216         checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
217
218         LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size());
219
220         final AbstractThreePhaseCommitCohort<?> ret;
221         switch (txContextWrappers.size()) {
222             case 0:
223                 ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
224                 break;
225             case 1:
226                 final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
227                         txContextWrappers.entrySet());
228                 ret = createSingleCommitCohort(e.getKey(), e.getValue());
229                 break;
230             default:
231                 ret = createMultiCommitCohort();
232         }
233
234         txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
235
236         final Throwable debugContext = getDebugContext();
237         return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
238     }
239
240     @SuppressWarnings({ "rawtypes", "unchecked" })
241     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
242             final AbstractTransactionContextWrapper contextWrapper) {
243
244         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
245
246         final OperationCallback.Reference operationCallbackRef =
247                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
248
249         final TransactionContext transactionContext = contextWrapper.getTransactionContext();
250         final Future future;
251         if (transactionContext == null) {
252             final Promise promise = akka.dispatch.Futures.promise();
253             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
254                 @Override
255                 public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
256                     promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
257                         havePermit));
258                 }
259             });
260             future = promise.future();
261         } else {
262             // avoid the creation of a promise and a TransactionOperation
263             future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
264         }
265
266         return new SingleCommitCohortProxy(txContextFactory.getActorUtils(), future, getIdentifier(),
267             operationCallbackRef);
268     }
269
270     private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
271             final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
272         TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
273                 txContextFactory.getActorUtils());
274         operationCallbackRef.set(rateLimitingCallback);
275         rateLimitingCallback.run();
276         return transactionContext.directCommit(havePermit);
277     }
278
279     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
280
281         final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
282         final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
283         for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
284             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
285
286             final AbstractTransactionContextWrapper wrapper = e.getValue();
287
288             // The remote tx version is obtained the via TransactionContext which may not be available yet so
289             // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
290             // TransactionContext is available.
291             cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
292                 () -> wrapper.getTransactionContext().getTransactionVersion()));
293         }
294
295         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
296     }
297
298     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
299         return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
300     }
301
302     private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
303         return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.create(childId)));
304     }
305
306     private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
307         final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
308         if (existing != null) {
309             return existing;
310         }
311
312         final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
313         txContextWrappers.put(shardName, fresh);
314         return fresh;
315     }
316
317     TransactionType getType() {
318         return type;
319     }
320
321     boolean isReady() {
322         return state != TransactionState.OPEN;
323     }
324
325     final ActorUtils getActorUtils() {
326         return txContextFactory.getActorUtils();
327     }
328 }