720ada3191e7fd5d06892e0583dcabd55097ffa6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalReadWriteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.function.Consumer;
13 import java.util.function.Supplier;
14 import javax.annotation.Nullable;
15 import javax.annotation.concurrent.NotThreadSafe;
16 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
17 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
18 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
21 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
22 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
23 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
24 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
25 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
26 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
29 import org.opendaylight.controller.cluster.access.concepts.Response;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
45  * the client instance.
46  *
47  * <p>
48  * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
49  * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
50  * leader.
51  *
52  * <p>
53  * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
54  * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
55  *
56  * @author Robert Varga
57  */
58 @NotThreadSafe
59 final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
60     private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class);
61
62     /**
63      * This field needs to be accessed via {@link #getModification()}, which performs state checking to ensure
64      * the modification can actually be accessed.
65      */
66     private final CursorAwareDataTreeModification modification;
67
68     private Supplier<? extends RuntimeException> closedException;
69
70     private CursorAwareDataTreeModification sealedModification;
71
72     /**
73      * Recorded failure from previous operations. Normally we would want to propagate the error directly to the
74      * offending call site, but that exposes inconsistency in behavior during initial connection, when we go through
75      * {@link RemoteProxyTransaction}, which detects this sort of issues at canCommit/directCommit time on the backend.
76      *
77      * <p>
78      * We therefore do not report incurred exceptions directly, but report them once the user attempts to commit
79      * this transaction.
80      */
81     private Exception recordedFailure;
82
83     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
84         final DataTreeSnapshot snapshot) {
85         super(parent, identifier);
86         this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
87     }
88
89     @Override
90     boolean isSnapshotOnly() {
91         return false;
92     }
93
94     @Override
95     CursorAwareDataTreeSnapshot readOnlyView() {
96         return getModification();
97     }
98
99     @Override
100     @SuppressWarnings("checkstyle:IllegalCatch")
101     void doDelete(final YangInstanceIdentifier path) {
102         final CursorAwareDataTreeModification mod = getModification();
103         if (recordedFailure != null) {
104             LOG.debug("Transaction {} recorded failure, ignoring delete of {}", getIdentifier(), path);
105             return;
106         }
107
108         try {
109             mod.delete(path);
110         } catch (Exception e) {
111             LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
112                 e);
113             recordedFailure = e;
114         }
115     }
116
117     @Override
118     @SuppressWarnings("checkstyle:IllegalCatch")
119     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
120         final CursorAwareDataTreeModification mod = getModification();
121         if (recordedFailure != null) {
122             LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path);
123             return;
124         }
125
126         try {
127             mod.merge(path, data);
128         } catch (Exception e) {
129             LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
130                 e);
131             recordedFailure = e;
132         }
133     }
134
135     @Override
136     @SuppressWarnings("checkstyle:IllegalCatch")
137     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
138         final CursorAwareDataTreeModification mod = getModification();
139         if (recordedFailure != null) {
140             LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path);
141             return;
142         }
143
144         try {
145             mod.write(path, data);
146         } catch (Exception e) {
147             LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
148                 e);
149             recordedFailure = e;
150         }
151     }
152
153     private RuntimeException abortedException() {
154         return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted");
155     }
156
157     private RuntimeException submittedException() {
158         return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted");
159     }
160
161     @Override
162     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
163         final CursorAwareDataTreeModification mod = getModification();
164         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
165             localActor(), mod, recordedFailure, coordinated);
166         closedException = this::submittedException;
167         return ret;
168     }
169
170     @Override
171     void doSeal() {
172         Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier());
173         final CursorAwareDataTreeModification mod = getModification();
174         mod.ready();
175         sealedModification = mod;
176     }
177
178     @Override
179     void flushState(final AbstractProxyTransaction successor) {
180         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
181             @Override
182             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
183                 successor.write(current().node(child), data);
184             }
185
186             @Override
187             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
188                 successor.merge(current().node(child), data);
189             }
190
191             @Override
192             public void delete(final PathArgument child) {
193                 successor.delete(current().node(child));
194             }
195         });
196     }
197
198     DataTreeSnapshot getSnapshot() {
199         Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier());
200         return sealedModification;
201     }
202
203     @Override
204     void applyModifyTransactionRequest(final ModifyTransactionRequest request,
205             final @Nullable Consumer<Response<?, ?>> callback) {
206         for (final TransactionModification mod : request.getModifications()) {
207             if (mod instanceof TransactionWrite) {
208                 write(mod.getPath(), ((TransactionWrite)mod).getData());
209             } else if (mod instanceof TransactionMerge) {
210                 merge(mod.getPath(), ((TransactionMerge)mod).getData());
211             } else if (mod instanceof TransactionDelete) {
212                 delete(mod.getPath());
213             } else {
214                 throw new IllegalArgumentException("Unsupported modification " + mod);
215             }
216         }
217
218         final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
219         if (maybeProtocol.isPresent()) {
220             Verify.verify(callback != null, "Request {} has null callback", request);
221             ensureSealed();
222
223             switch (maybeProtocol.get()) {
224                 case ABORT:
225                     sendRequest(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
226                     break;
227                 case READY:
228                     // No-op, as we have already issued a seal()
229                     break;
230                 case SIMPLE:
231                     sendRequest(commitRequest(false), callback);
232                     break;
233                 case THREE_PHASE:
234                     sendRequest(commitRequest(true), callback);
235                     break;
236                 default:
237                     throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
238             }
239         }
240     }
241
242     @Override
243     void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
244             final Consumer<Response<?, ?>> callback) {
245         if (request instanceof CommitLocalTransactionRequest) {
246             sendCommit((CommitLocalTransactionRequest) request, callback);
247         } else {
248             super.handleForwardedLocalRequest(request, callback);
249         }
250     }
251
252     @Override
253     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
254             final @Nullable Consumer<Response<?, ?>> callback) {
255         LOG.debug("Applying forwarded request {}", request);
256
257         if (request instanceof TransactionPreCommitRequest) {
258             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
259         } else if (request instanceof TransactionDoCommitRequest) {
260             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
261         } else if (request instanceof TransactionAbortRequest) {
262             sendAbort(callback);
263         } else {
264             super.handleForwardedRemoteRequest(request, callback);
265         }
266     }
267
268     @Override
269     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
270             final Consumer<Response<?, ?>> callback) {
271         if (request instanceof CommitLocalTransactionRequest) {
272             Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
273             ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
274             LOG.debug("Forwarded request {} to successor {}", request, successor);
275         } else {
276             super.forwardToLocal(successor, request, callback);
277         }
278     }
279
280     @Override
281     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
282         super.sendAbort(request, callback);
283         closedException = this::abortedException;
284     }
285
286     private CursorAwareDataTreeModification getModification() {
287         if (closedException != null) {
288             throw closedException.get();
289         }
290
291         return modification;
292     }
293
294     private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
295         // Rebase old modification on new data tree.
296         final CursorAwareDataTreeModification mod = getModification();
297
298         try (DataTreeModificationCursor cursor = mod.createCursor(YangInstanceIdentifier.EMPTY)) {
299             request.getModification().applyToCursor(cursor);
300         }
301
302         ensureSealed();
303         sendRequest(commitRequest(request.isCoordinated()), callback);
304     }
305 }