Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalReadWriteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13
14 import com.google.common.util.concurrent.FluentFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Optional;
17 import java.util.OptionalLong;
18 import java.util.function.BiConsumer;
19 import java.util.function.Consumer;
20 import java.util.function.Supplier;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
25 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
26 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
28 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
29 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
30 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
39 import org.opendaylight.controller.cluster.access.concepts.Response;
40 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
41 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
42 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeModification;
49 import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeSnapshot;
50 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModificationCursor;
52 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
58  * the client instance. This class is NOT thread-safe.
59  *
60  * <p>
61  * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
62  * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
63  * leader.
64  *
65  * <p>
66  * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
67  * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
68  *
69  * @author Robert Varga
70  */
71 final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
72     private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
73
74     /**
75      * This field needs to be accessed via {@link #getModification()}, which performs state checking to ensure
76      * the modification can actually be accessed.
77      */
78     private final CursorAwareDataTreeModification modification;
79
80     private Supplier<? extends RuntimeException> closedException;
81
82     private CursorAwareDataTreeModification sealedModification;
83
84     /**
85      * Recorded failure from previous operations. Normally we would want to propagate the error directly to the
86      * offending call site, but that exposes inconsistency in behavior during initial connection, when we go through
87      * {@link RemoteProxyTransaction}, which detects this sort of issues at canCommit/directCommit time on the backend.
88      *
89      * <p>
90      * We therefore do not report incurred exceptions directly, but report them once the user attempts to commit
91      * this transaction.
92      */
93     private Exception recordedFailure;
94
95     @SuppressWarnings("checkstyle:IllegalCatch")
96     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
97             final DataTreeSnapshot snapshot) {
98         super(parent, identifier, false);
99
100         if (snapshot instanceof FailedDataTreeModification failed) {
101             recordedFailure = failed.cause();
102             modification = failed;
103         } else {
104             CursorAwareDataTreeModification mod;
105             try {
106                 mod = (CursorAwareDataTreeModification) snapshot.newModification();
107             } catch (Exception e) {
108                 LOG.debug("Failed to instantiate modification for {}", identifier, e);
109                 recordedFailure = e;
110                 mod = new FailedDataTreeModification(snapshot.modelContext(), e);
111             }
112             modification = mod;
113         }
114     }
115
116     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
117         super(parent, identifier, true);
118         // This is DONE transaction, this should never be touched
119         modification = null;
120     }
121
122     @Override
123     boolean isSnapshotOnly() {
124         return false;
125     }
126
127     @Override
128     CursorAwareDataTreeSnapshot readOnlyView() {
129         return getModification();
130     }
131
132     @Override
133     FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
134         final var ex = recordedFailure;
135         return ex == null ? super.doExists(path)
136             : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
137     }
138
139     @Override
140     FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
141         final var ex = recordedFailure;
142         return ex == null ? super.doRead(path)
143             : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
144     }
145
146     @Override
147     @SuppressWarnings("checkstyle:IllegalCatch")
148     void doDelete(final YangInstanceIdentifier path) {
149         final CursorAwareDataTreeModification mod = getModification();
150         if (recordedFailure != null) {
151             LOG.debug("Transaction {} recorded failure, ignoring delete of {}", getIdentifier(), path);
152             return;
153         }
154
155         try {
156             mod.delete(path);
157         } catch (Exception e) {
158             LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
159                 e);
160             recordedFailure = e;
161         }
162     }
163
164     @Override
165     @SuppressWarnings("checkstyle:IllegalCatch")
166     void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) {
167         final CursorAwareDataTreeModification mod = getModification();
168         if (recordedFailure != null) {
169             LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path);
170             return;
171         }
172
173         try {
174             mod.merge(path, data);
175         } catch (Exception e) {
176             LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
177                 e);
178             recordedFailure = e;
179         }
180     }
181
182     @Override
183     @SuppressWarnings("checkstyle:IllegalCatch")
184     void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) {
185         final CursorAwareDataTreeModification mod = getModification();
186         if (recordedFailure != null) {
187             LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path);
188             return;
189         }
190
191         try {
192             mod.write(path, data);
193         } catch (Exception e) {
194             LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
195                 e);
196             recordedFailure = e;
197         }
198     }
199
200     private RuntimeException abortedException() {
201         return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
202     }
203
204     private RuntimeException submittedException() {
205         return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
206     }
207
208     @Override
209     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
210         final CursorAwareDataTreeModification mod = getModification();
211         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
212             localActor(), mod, recordedFailure, coordinated);
213         closedException = this::submittedException;
214         return ret;
215     }
216
217     private void sealModification() {
218         checkState(sealedModification == null, "Transaction %s is already sealed", this);
219         final CursorAwareDataTreeModification mod = getModification();
220         mod.ready();
221         sealedModification = mod;
222     }
223
224     @Override
225     boolean sealOnly() {
226         sealModification();
227         return super.sealOnly();
228     }
229
230     @Override
231     boolean sealAndSend(final OptionalLong enqueuedTicks) {
232         sealModification();
233         return super.sealAndSend(enqueuedTicks);
234     }
235
236     @Override
237     Optional<ModifyTransactionRequest> flushState() {
238         final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
239         b.setSequence(0);
240
241         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
242             @Override
243             public void write(final PathArgument child, final NormalizedNode data) {
244                 b.addModification(new TransactionWrite(current().node(child), data));
245             }
246
247             @Override
248             public void merge(final PathArgument child, final NormalizedNode data) {
249                 b.addModification(new TransactionMerge(current().node(child), data));
250             }
251
252             @Override
253             public void delete(final PathArgument child) {
254                 b.addModification(new TransactionDelete(current().node(child)));
255             }
256         });
257
258         return Optional.of(b.build());
259     }
260
261     CursorAwareDataTreeSnapshot getSnapshot() {
262         checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
263         return sealedModification;
264     }
265
266     @Override
267     void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
268             final Consumer<Response<?, ?>> callback) {
269         commonModifyTransactionRequest(request, callback, this::sendRequest);
270     }
271
272     @Override
273     void replayModifyTransactionRequest(final ModifyTransactionRequest request,
274             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
275         commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
276     }
277
278     private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
279             final @Nullable Consumer<Response<?, ?>> callback,
280             final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
281         for (final TransactionModification mod : request.getModifications()) {
282             if (mod instanceof TransactionWrite) {
283                 write(mod.getPath(), ((TransactionWrite)mod).getData());
284             } else if (mod instanceof TransactionMerge) {
285                 merge(mod.getPath(), ((TransactionMerge)mod).getData());
286             } else if (mod instanceof TransactionDelete) {
287                 delete(mod.getPath());
288             } else {
289                 throw new IllegalArgumentException("Unsupported modification " + mod);
290             }
291         }
292
293         final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
294         if (maybeProtocol.isPresent()) {
295             final var cb = verifyNotNull(callback, "Request %s has null callback", request);
296             if (markSealed()) {
297                 sealOnly();
298             }
299
300             switch (maybeProtocol.orElseThrow()) {
301                 case ABORT:
302                     sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), cb);
303                     break;
304                 case READY:
305                     // No-op, as we have already issued a sealOnly() and we are not transmitting anything
306                     break;
307                 case SIMPLE:
308                     sendMethod.accept(commitRequest(false), cb);
309                     break;
310                 case THREE_PHASE:
311                     sendMethod.accept(commitRequest(true), cb);
312                     break;
313                 default:
314                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.orElseThrow());
315             }
316         }
317     }
318
319     @Override
320     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
321             final Consumer<Response<?, ?>> callback, final long now) {
322         if (request instanceof CommitLocalTransactionRequest) {
323             enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
324         } else {
325             super.handleReplayedLocalRequest(request, callback, now);
326         }
327     }
328
329     @Override
330     void handleReplayedRemoteRequest(final TransactionRequest<?> request,
331             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
332         LOG.debug("Applying replayed request {}", request);
333
334         if (request instanceof TransactionPreCommitRequest) {
335             enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
336                 enqueuedTicks);
337         } else if (request instanceof TransactionDoCommitRequest) {
338             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
339                 enqueuedTicks);
340         } else if (request instanceof TransactionAbortRequest) {
341             enqueueDoAbort(callback, enqueuedTicks);
342         } else {
343             super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
344         }
345     }
346
347     @Override
348     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
349         LOG.debug("Applying forwarded request {}", request);
350
351         if (request instanceof TransactionPreCommitRequest) {
352             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
353         } else if (request instanceof TransactionDoCommitRequest) {
354             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
355         } else if (request instanceof TransactionAbortRequest) {
356             sendDoAbort(callback);
357         } else {
358             super.handleForwardedRemoteRequest(request, callback);
359         }
360     }
361
362     @Override
363     Response<?, ?> handleExistsRequest(final DataTreeSnapshot snapshot, final ExistsTransactionRequest request) {
364         final var ex = recordedFailure;
365         return ex == null ? super.handleExistsRequest(snapshot, request)
366             : request.toRequestFailure(
367                 new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
368     }
369
370     @Override
371     Response<?, ?> handleReadRequest(final DataTreeSnapshot snapshot, final ReadTransactionRequest request) {
372         final var ex = recordedFailure;
373         return ex == null ? super.handleReadRequest(snapshot, request)
374             : request.toRequestFailure(
375                 new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
376     }
377
378     @Override
379     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
380             final Consumer<Response<?, ?>> callback) {
381         if (request instanceof CommitLocalTransactionRequest) {
382             verifyLocalReadWrite(successor).sendRebased((CommitLocalTransactionRequest)request, callback);
383         } else if (request instanceof ModifyTransactionRequest) {
384             verifyLocalReadWrite(successor).handleForwardedRemoteRequest(request, callback);
385         } else {
386             super.forwardToLocal(successor, request, callback);
387             return;
388         }
389         LOG.debug("Forwarded request {} to successor {}", request, successor);
390     }
391
392     private static LocalReadWriteProxyTransaction verifyLocalReadWrite(final LocalProxyTransaction successor) {
393         verify(successor instanceof LocalReadWriteProxyTransaction, "Unexpected successor %s", successor);
394         return (LocalReadWriteProxyTransaction) successor;
395     }
396
397     @Override
398     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
399         super.sendAbort(request, callback);
400         closedException = this::abortedException;
401     }
402
403     @Override
404     void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
405             final long enqueuedTicks) {
406         super.enqueueAbort(request, callback, enqueuedTicks);
407         closedException = this::abortedException;
408     }
409
410     @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of recorded failure")
411     private @NonNull CursorAwareDataTreeModification getModification() {
412         if (closedException != null) {
413             throw closedException.get();
414         }
415         return verifyNotNull(modification, "Transaction %s is DONE", getIdentifier());
416     }
417
418     private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
419         sendRequest(rebaseCommit(request), callback);
420     }
421
422     private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
423         // Rebase old modification on new data tree.
424         final CursorAwareDataTreeModification mod = getModification();
425
426         if (!(mod instanceof FailedDataTreeModification)) {
427             request.getDelayedFailure().ifPresentOrElse(failure -> {
428                 if (recordedFailure == null) {
429                     recordedFailure = failure;
430                 } else {
431                     recordedFailure.addSuppressed(failure);
432                 }
433             }, () -> {
434                 try (DataTreeModificationCursor cursor = mod.openCursor()) {
435                     request.getModification().applyToCursor(cursor);
436                 }
437             });
438         }
439
440         if (markSealed()) {
441             sealOnly();
442         }
443
444         return commitRequest(request.isCoordinated());
445     }
446 }