Bump odlparent to 5.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalReadWriteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.Optional;
13 import java.util.function.BiConsumer;
14 import java.util.function.Consumer;
15 import java.util.function.Supplier;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
23 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
24 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
26 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
28 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
29 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
48  * the client instance. This class is NOT thread-safe.
49  *
50  * <p>
51  * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
52  * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
53  * leader.
54  *
55  * <p>
56  * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
57  * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
58  *
59  * @author Robert Varga
60  */
61 final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
62     private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
63
64     /**
65      * This field needs to be accessed via {@link #getModification()}, which performs state checking to ensure
66      * the modification can actually be accessed.
67      */
68     private final CursorAwareDataTreeModification modification;
69
70     private Supplier<? extends RuntimeException> closedException;
71
72     private CursorAwareDataTreeModification sealedModification;
73
74     /**
75      * Recorded failure from previous operations. Normally we would want to propagate the error directly to the
76      * offending call site, but that exposes inconsistency in behavior during initial connection, when we go through
77      * {@link RemoteProxyTransaction}, which detects this sort of issues at canCommit/directCommit time on the backend.
78      *
79      * <p>
80      * We therefore do not report incurred exceptions directly, but report them once the user attempts to commit
81      * this transaction.
82      */
83     private Exception recordedFailure;
84
85     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
86         final DataTreeSnapshot snapshot) {
87         super(parent, identifier, false);
88         this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
89     }
90
91     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
92         super(parent, identifier, true);
93         // This is DONE transaction, this should never be touched
94         this.modification = null;
95     }
96
97     @Override
98     boolean isSnapshotOnly() {
99         return false;
100     }
101
102     @Override
103     CursorAwareDataTreeSnapshot readOnlyView() {
104         return getModification();
105     }
106
107     @Override
108     @SuppressWarnings("checkstyle:IllegalCatch")
109     void doDelete(final YangInstanceIdentifier path) {
110         final CursorAwareDataTreeModification mod = getModification();
111         if (recordedFailure != null) {
112             LOG.debug("Transaction {} recorded failure, ignoring delete of {}", getIdentifier(), path);
113             return;
114         }
115
116         try {
117             mod.delete(path);
118         } catch (Exception e) {
119             LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
120                 e);
121             recordedFailure = e;
122         }
123     }
124
125     @Override
126     @SuppressWarnings("checkstyle:IllegalCatch")
127     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
128         final CursorAwareDataTreeModification mod = getModification();
129         if (recordedFailure != null) {
130             LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path);
131             return;
132         }
133
134         try {
135             mod.merge(path, data);
136         } catch (Exception e) {
137             LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
138                 e);
139             recordedFailure = e;
140         }
141     }
142
143     @Override
144     @SuppressWarnings("checkstyle:IllegalCatch")
145     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
146         final CursorAwareDataTreeModification mod = getModification();
147         if (recordedFailure != null) {
148             LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path);
149             return;
150         }
151
152         try {
153             mod.write(path, data);
154         } catch (Exception e) {
155             LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
156                 e);
157             recordedFailure = e;
158         }
159     }
160
161     private RuntimeException abortedException() {
162         return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
163     }
164
165     private RuntimeException submittedException() {
166         return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
167     }
168
169     @Override
170     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
171         final CursorAwareDataTreeModification mod = getModification();
172         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
173             localActor(), mod, recordedFailure, coordinated);
174         closedException = this::submittedException;
175         return ret;
176     }
177
178     private void sealModification() {
179         Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this);
180         final CursorAwareDataTreeModification mod = getModification();
181         mod.ready();
182         sealedModification = mod;
183     }
184
185     @Override
186     boolean sealOnly() {
187         sealModification();
188         return super.sealOnly();
189     }
190
191     @Override
192     boolean sealAndSend(final Optional<Long> enqueuedTicks) {
193         sealModification();
194         return super.sealAndSend(enqueuedTicks);
195     }
196
197     @Override
198     Optional<ModifyTransactionRequest> flushState() {
199         final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
200         b.setSequence(0);
201
202         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
203             @Override
204             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
205                 b.addModification(new TransactionWrite(current().node(child), data));
206             }
207
208             @Override
209             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
210                 b.addModification(new TransactionMerge(current().node(child), data));
211             }
212
213             @Override
214             public void delete(final PathArgument child) {
215                 b.addModification(new TransactionDelete(current().node(child)));
216             }
217         });
218
219         return Optional.of(b.build());
220     }
221
222     DataTreeSnapshot getSnapshot() {
223         Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
224         return sealedModification;
225     }
226
227     @Override
228     void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
229             final Consumer<Response<?, ?>> callback) {
230         commonModifyTransactionRequest(request, callback, this::sendRequest);
231     }
232
233     @Override
234     void replayModifyTransactionRequest(final ModifyTransactionRequest request,
235             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
236         commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
237     }
238
239     private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
240             final @Nullable Consumer<Response<?, ?>> callback,
241             final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
242         for (final TransactionModification mod : request.getModifications()) {
243             if (mod instanceof TransactionWrite) {
244                 write(mod.getPath(), ((TransactionWrite)mod).getData());
245             } else if (mod instanceof TransactionMerge) {
246                 merge(mod.getPath(), ((TransactionMerge)mod).getData());
247             } else if (mod instanceof TransactionDelete) {
248                 delete(mod.getPath());
249             } else {
250                 throw new IllegalArgumentException("Unsupported modification " + mod);
251             }
252         }
253
254         final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
255         if (maybeProtocol.isPresent()) {
256             Verify.verify(callback != null, "Request %s has null callback", request);
257             if (markSealed()) {
258                 sealOnly();
259             }
260
261             switch (maybeProtocol.get()) {
262                 case ABORT:
263                     sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
264                     break;
265                 case READY:
266                     // No-op, as we have already issued a sealOnly() and we are not transmitting anything
267                     break;
268                 case SIMPLE:
269                     sendMethod.accept(commitRequest(false), callback);
270                     break;
271                 case THREE_PHASE:
272                     sendMethod.accept(commitRequest(true), callback);
273                     break;
274                 default:
275                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
276             }
277         }
278     }
279
280     @Override
281     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
282             final Consumer<Response<?, ?>> callback, final long now) {
283         if (request instanceof CommitLocalTransactionRequest) {
284             enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
285         } else {
286             super.handleReplayedLocalRequest(request, callback, now);
287         }
288     }
289
290     @Override
291     void handleReplayedRemoteRequest(final TransactionRequest<?> request,
292             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
293         LOG.debug("Applying replayed request {}", request);
294
295         if (request instanceof TransactionPreCommitRequest) {
296             enqueueRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
297                 enqueuedTicks);
298         } else if (request instanceof TransactionDoCommitRequest) {
299             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
300                 enqueuedTicks);
301         } else if (request instanceof TransactionAbortRequest) {
302             enqueueDoAbort(callback, enqueuedTicks);
303         } else {
304             super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
305         }
306     }
307
308     @Override
309     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
310         LOG.debug("Applying forwarded request {}", request);
311
312         if (request instanceof TransactionPreCommitRequest) {
313             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
314         } else if (request instanceof TransactionDoCommitRequest) {
315             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
316         } else if (request instanceof TransactionAbortRequest) {
317             sendDoAbort(callback);
318         } else {
319             super.handleForwardedRemoteRequest(request, callback);
320         }
321     }
322
323     @Override
324     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
325             final Consumer<Response<?, ?>> callback) {
326         if (request instanceof CommitLocalTransactionRequest) {
327             Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
328             ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback);
329             LOG.debug("Forwarded request {} to successor {}", request, successor);
330         } else {
331             super.forwardToLocal(successor, request, callback);
332         }
333     }
334
335     @Override
336     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
337         super.sendAbort(request, callback);
338         closedException = this::abortedException;
339     }
340
341     @Override
342     void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
343             final long enqueuedTicks) {
344         super.enqueueAbort(request, callback, enqueuedTicks);
345         closedException = this::abortedException;
346     }
347
348     private @NonNull CursorAwareDataTreeModification getModification() {
349         if (closedException != null) {
350             throw closedException.get();
351         }
352
353         return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
354     }
355
356     private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
357         sendRequest(rebaseCommit(request), callback);
358     }
359
360     private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
361         // Rebase old modification on new data tree.
362         final CursorAwareDataTreeModification mod = getModification();
363
364         try (DataTreeModificationCursor cursor = mod.openCursor()) {
365             request.getModification().applyToCursor(cursor);
366         }
367
368         if (markSealed()) {
369             sealOnly();
370         }
371
372         return commitRequest(request.isCoordinated());
373     }
374 }