BUG-8618: rework AbstractProxyTransaction.flushState()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalReadWriteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.Optional;
13 import java.util.function.BiConsumer;
14 import java.util.function.Consumer;
15 import java.util.function.Supplier;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import javax.annotation.concurrent.NotThreadSafe;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
24 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
25 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
27 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
29 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
30 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
33 import org.opendaylight.controller.cluster.access.concepts.Response;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
49  * the client instance.
50  *
51  * <p>
52  * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
53  * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
54  * leader.
55  *
56  * <p>
57  * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
58  * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
59  *
60  * @author Robert Varga
61  */
62 @NotThreadSafe
63 final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
64     private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
65
66     /**
67      * This field needs to be accessed via {@link #getModification()}, which performs state checking to ensure
68      * the modification can actually be accessed.
69      */
70     private final CursorAwareDataTreeModification modification;
71
72     private Supplier<? extends RuntimeException> closedException;
73
74     private CursorAwareDataTreeModification sealedModification;
75
76     /**
77      * Recorded failure from previous operations. Normally we would want to propagate the error directly to the
78      * offending call site, but that exposes inconsistency in behavior during initial connection, when we go through
79      * {@link RemoteProxyTransaction}, which detects this sort of issues at canCommit/directCommit time on the backend.
80      *
81      * <p>
82      * We therefore do not report incurred exceptions directly, but report them once the user attempts to commit
83      * this transaction.
84      */
85     private Exception recordedFailure;
86
87     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
88         final DataTreeSnapshot snapshot) {
89         super(parent, identifier, false);
90         this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
91     }
92
93     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
94         super(parent, identifier, true);
95         // This is DONE transaction, this should never be touched
96         this.modification = null;
97     }
98
99     @Override
100     boolean isSnapshotOnly() {
101         return false;
102     }
103
104     @Override
105     CursorAwareDataTreeSnapshot readOnlyView() {
106         return getModification();
107     }
108
109     @Override
110     @SuppressWarnings("checkstyle:IllegalCatch")
111     void doDelete(final YangInstanceIdentifier path) {
112         final CursorAwareDataTreeModification mod = getModification();
113         if (recordedFailure != null) {
114             LOG.debug("Transaction {} recorded failure, ignoring delete of {}", getIdentifier(), path);
115             return;
116         }
117
118         try {
119             mod.delete(path);
120         } catch (Exception e) {
121             LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
122                 e);
123             recordedFailure = e;
124         }
125     }
126
127     @Override
128     @SuppressWarnings("checkstyle:IllegalCatch")
129     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
130         final CursorAwareDataTreeModification mod = getModification();
131         if (recordedFailure != null) {
132             LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path);
133             return;
134         }
135
136         try {
137             mod.merge(path, data);
138         } catch (Exception e) {
139             LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
140                 e);
141             recordedFailure = e;
142         }
143     }
144
145     @Override
146     @SuppressWarnings("checkstyle:IllegalCatch")
147     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
148         final CursorAwareDataTreeModification mod = getModification();
149         if (recordedFailure != null) {
150             LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path);
151             return;
152         }
153
154         try {
155             mod.write(path, data);
156         } catch (Exception e) {
157             LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
158                 e);
159             recordedFailure = e;
160         }
161     }
162
163     private RuntimeException abortedException() {
164         return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
165     }
166
167     private RuntimeException submittedException() {
168         return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
169     }
170
171     @Override
172     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
173         final CursorAwareDataTreeModification mod = getModification();
174         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
175             localActor(), mod, recordedFailure, coordinated);
176         closedException = this::submittedException;
177         return ret;
178     }
179
180     private void sealModification() {
181         Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this);
182         final CursorAwareDataTreeModification mod = getModification();
183         mod.ready();
184         sealedModification = mod;
185     }
186
187     @Override
188     void sealOnly() {
189         sealModification();
190         super.sealOnly();
191     }
192
193     @Override
194     boolean sealAndSend(final com.google.common.base.Optional<Long> enqueuedTicks) {
195         sealModification();
196         return super.sealAndSend(enqueuedTicks);
197     }
198
199     @Override
200     Optional<ModifyTransactionRequest> flushState() {
201         final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
202         b.setSequence(0);
203
204         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
205             @Override
206             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
207                 b.addModification(new TransactionWrite(current().node(child), data));
208             }
209
210             @Override
211             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
212                 b.addModification(new TransactionMerge(current().node(child), data));
213             }
214
215             @Override
216             public void delete(final PathArgument child) {
217                 b.addModification(new TransactionDelete(current().node(child)));
218             }
219         });
220
221         return Optional.of(b.build());
222     }
223
224     DataTreeSnapshot getSnapshot() {
225         Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
226         return sealedModification;
227     }
228
229     @Override
230     void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
231             final @Nullable Consumer<Response<?, ?>> callback) {
232         commonModifyTransactionRequest(request, callback, this::sendRequest);
233     }
234
235     @Override
236     void replayModifyTransactionRequest(final ModifyTransactionRequest request,
237             final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
238         commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
239     }
240
241     private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
242             final @Nullable Consumer<Response<?, ?>> callback,
243             final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
244         for (final TransactionModification mod : request.getModifications()) {
245             if (mod instanceof TransactionWrite) {
246                 write(mod.getPath(), ((TransactionWrite)mod).getData());
247             } else if (mod instanceof TransactionMerge) {
248                 merge(mod.getPath(), ((TransactionMerge)mod).getData());
249             } else if (mod instanceof TransactionDelete) {
250                 delete(mod.getPath());
251             } else {
252                 throw new IllegalArgumentException("Unsupported modification " + mod);
253             }
254         }
255
256         final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
257         if (maybeProtocol.isPresent()) {
258             Verify.verify(callback != null, "Request %s has null callback", request);
259             if (markSealed()) {
260                 sealOnly();
261             }
262
263             switch (maybeProtocol.get()) {
264                 case ABORT:
265                     sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
266                     break;
267                 case READY:
268                     // No-op, as we have already issued a sealOnly() and we are not transmitting anything
269                     break;
270                 case SIMPLE:
271                     sendMethod.accept(commitRequest(false), callback);
272                     break;
273                 case THREE_PHASE:
274                     sendMethod.accept(commitRequest(true), callback);
275                     break;
276                 default:
277                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
278             }
279         }
280     }
281
282     @Override
283     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
284             final Consumer<Response<?, ?>> callback, final long now) {
285         if (request instanceof CommitLocalTransactionRequest) {
286             enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
287         } else {
288             super.handleReplayedLocalRequest(request, callback, now);
289         }
290     }
291
292     @Override
293     void handleReplayedRemoteRequest(final TransactionRequest<?> request,
294             final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
295         LOG.debug("Applying replayed request {}", request);
296
297         if (request instanceof TransactionPreCommitRequest) {
298             enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
299                 enqueuedTicks);
300         } else if (request instanceof TransactionDoCommitRequest) {
301             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
302                 enqueuedTicks);
303         } else if (request instanceof TransactionAbortRequest) {
304             enqueueDoAbort(callback, enqueuedTicks);
305         } else {
306             super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
307         }
308     }
309
310     @Override
311     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
312         LOG.debug("Applying forwarded request {}", request);
313
314         if (request instanceof TransactionPreCommitRequest) {
315             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
316         } else if (request instanceof TransactionDoCommitRequest) {
317             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
318         } else if (request instanceof TransactionAbortRequest) {
319             sendDoAbort(callback);
320         } else {
321             super.handleForwardedRemoteRequest(request, callback);
322         }
323     }
324
325     @Override
326     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
327             final Consumer<Response<?, ?>> callback) {
328         if (request instanceof CommitLocalTransactionRequest) {
329             Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
330             ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback);
331             LOG.debug("Forwarded request {} to successor {}", request, successor);
332         } else {
333             super.forwardToLocal(successor, request, callback);
334         }
335     }
336
337     @Override
338     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
339         super.sendAbort(request, callback);
340         closedException = this::abortedException;
341     }
342
343     @Override
344     void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
345             final long enqueuedTicks) {
346         super.enqueueAbort(request, callback, enqueuedTicks);
347         closedException = this::abortedException;
348     }
349
350     private @Nonnull CursorAwareDataTreeModification getModification() {
351         if (closedException != null) {
352             throw closedException.get();
353         }
354
355         return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
356     }
357
358     private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
359         sendRequest(rebaseCommit(request), callback);
360     }
361
362     private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
363         // Rebase old modification on new data tree.
364         final CursorAwareDataTreeModification mod = getModification();
365
366         try (DataTreeModificationCursor cursor = mod.createCursor(YangInstanceIdentifier.EMPTY)) {
367             request.getModification().applyToCursor(cursor);
368         }
369
370         if (markSealed()) {
371             sealOnly();
372         }
373
374         return commitRequest(request.isCoordinated());
375     }
376 }