BUG-8372: fix AbstractProxyTransaction.replayMessages()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.function.Consumer;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
24 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
25 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
26 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
27 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
28 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
31 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
33 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
34 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
38 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
39 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
40 import org.opendaylight.controller.cluster.access.concepts.Response;
41 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
42 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
54  * not known or is known to be not co-located with the client.
55  *
56  * <p>
57  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
58  * maintaining any submitted operations until the leader is discovered.
59  *
60  * <p>
61  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
62  * transitions based on backend responses are thread-safe.
63  *
64  * @author Robert Varga
65  */
66 final class RemoteProxyTransaction extends AbstractProxyTransaction {
67     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
68
69     // FIXME: make this tuneable
70     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
71
72     private final ModifyTransactionRequestBuilder builder;
73     private final boolean sendReadyOnSeal;
74     private final boolean snapshotOnly;
75
76     private boolean builderBusy;
77
78     private volatile Exception operationFailure;
79
80     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
81             final boolean snapshotOnly, final boolean sendReadyOnSeal) {
82         super(parent);
83         this.snapshotOnly = snapshotOnly;
84         this.sendReadyOnSeal = sendReadyOnSeal;
85         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
86     }
87
88     @Override
89     boolean isSnapshotOnly() {
90         return snapshotOnly;
91     }
92
93     @Override
94     public TransactionIdentifier getIdentifier() {
95         return builder.getIdentifier();
96     }
97
98     @Override
99     void doDelete(final YangInstanceIdentifier path) {
100         appendModification(new TransactionDelete(path));
101     }
102
103     @Override
104     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
105         appendModification(new TransactionMerge(path, data));
106     }
107
108     @Override
109     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
110         appendModification(new TransactionWrite(path, data));
111     }
112
113     private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
114             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
115         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
116         final Exception local = operationFailure;
117         if (local != null) {
118             return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local));
119         }
120
121         // Make sure we send any modifications before issuing a read
122         ensureFlushedBuider();
123         sendRequest(request, completer);
124         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
125     }
126
127     @Override
128     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
129         final SettableFuture<Boolean> future = SettableFuture.create();
130         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
131             isSnapshotOnly()), t -> completeExists(future, t), future);
132     }
133
134     @Override
135     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
136         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
137         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
138             isSnapshotOnly()), t -> completeRead(future, t), future);
139     }
140
141     @Override
142     void doAbort() {
143         ensureInitializedBuilder();
144         builder.setAbort();
145         flushBuilder();
146     }
147
148     private void ensureInitializedBuilder() {
149         if (!builderBusy) {
150             builder.setSequence(nextSequence());
151             builderBusy = true;
152         }
153     }
154
155     private void ensureFlushedBuider() {
156         if (builderBusy) {
157             flushBuilder();
158         }
159     }
160
161     private void flushBuilder() {
162         final ModifyTransactionRequest request = builder.build();
163         builderBusy = false;
164
165         sendModification(request);
166     }
167
168     private void sendModification(final TransactionRequest<?> request) {
169         sendRequest(request, response -> completeModify(request, response));
170     }
171
172     @Override
173     void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
174             final Consumer<Response<?, ?>> callback) {
175         if (request instanceof CommitLocalTransactionRequest) {
176             replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback);
177         } else if (request instanceof AbortLocalTransactionRequest) {
178             sendRequest(abortRequest(), callback);
179         } else {
180             throw new IllegalStateException("Unhandled request " + request);
181         }
182     }
183
184     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
185             final Consumer<Response<?, ?>> callback) {
186         final DataTreeModification mod = request.getModification();
187         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
188             @Override
189             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
190                 doWrite(current().node(child), data);
191             }
192
193             @Override
194             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
195                 doMerge(current().node(child), data);
196             }
197
198             @Override
199             public void delete(final PathArgument child) {
200                 doDelete(current().node(child));
201             }
202         });
203
204         sendRequest(commitRequest(request.isCoordinated()), callback);
205     }
206
207     @Override
208     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
209             final @Nullable Consumer<Response<?, ?>> callback) {
210         nextSequence();
211
212         if (callback == null) {
213             sendModification(request);
214             return;
215         }
216
217         /*
218          * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null
219          * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below.
220          */
221         final Consumer<Response<?, ?>> findBugsIsStupid = callback;
222
223         // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
224         //        period required to get into the queue.
225         sendRequest(request, response -> {
226             findBugsIsStupid.accept(Preconditions.checkNotNull(response));
227             completeModify(request, response);
228         });
229     }
230
231     private void appendModification(final TransactionModification modification) {
232         if (operationFailure == null) {
233             ensureInitializedBuilder();
234
235             builder.addModification(modification);
236             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
237                 flushBuilder();
238             }
239         } else {
240             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
241         }
242     }
243
244     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
245         LOG.debug("Modification request {} completed with {}", request, response);
246
247         if (response instanceof TransactionSuccess) {
248             // Happy path
249             recordSuccessfulRequest(request);
250         } else {
251             recordFailedResponse(response);
252         }
253     }
254
255     private Exception recordFailedResponse(final Response<?, ?> response) {
256         final Exception failure;
257         if (response instanceof RequestFailure) {
258             failure = ((RequestFailure<?, ?>) response).getCause();
259         } else {
260             LOG.warn("Unhandled response {}", response);
261             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
262         }
263
264         if (operationFailure == null) {
265             LOG.debug("Transaction {} failed", getIdentifier(), failure);
266             operationFailure = failure;
267         }
268         return failure;
269     }
270
271     private void failFuture(final SettableFuture<?> future, final Response<?, ?> response) {
272         future.setException(recordFailedResponse(response));
273     }
274
275     private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
276         LOG.debug("Exists request completed with {}", response);
277
278         if (response instanceof ExistsTransactionSuccess) {
279             future.set(((ExistsTransactionSuccess) response).getExists());
280         } else {
281             failFuture(future, response);
282         }
283
284         recordFinishedRequest();
285     }
286
287     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
288             final Response<?, ?> response) {
289         LOG.debug("Read request completed with {}", response);
290
291         if (response instanceof ReadTransactionSuccess) {
292             future.set(((ReadTransactionSuccess) response).getData());
293         } else {
294             failFuture(future, response);
295         }
296
297         recordFinishedRequest();
298     }
299
300     private ModifyTransactionRequest abortRequest() {
301         ensureInitializedBuilder();
302         builder.setAbort();
303         final ModifyTransactionRequest ret = builder.build();
304         builderBusy = false;
305         return ret;
306     }
307
308     @Override
309     ModifyTransactionRequest commitRequest(final boolean coordinated) {
310         ensureInitializedBuilder();
311         builder.setCommit(coordinated);
312
313         final ModifyTransactionRequest ret = builder.build();
314         builderBusy = false;
315         return ret;
316     }
317
318     @Override
319     void doSeal() {
320         if (sendReadyOnSeal) {
321             ensureInitializedBuilder();
322             builder.setReady();
323             flushBuilder();
324         }
325     }
326
327     @Override
328     void flushState(final AbstractProxyTransaction successor) {
329         if (builderBusy) {
330             final ModifyTransactionRequest request = builder.build();
331             builderBusy = false;
332             successor.handleForwardedRemoteRequest(request, null);
333         }
334     }
335
336     @Override
337     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
338             final Consumer<Response<?, ?>> callback) {
339         successor.handleForwardedRequest(request, callback);
340     }
341
342     private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
343         if (request instanceof ModifyTransactionRequest) {
344             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
345
346             req.getModifications().forEach(this::appendModification);
347
348             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
349             if (maybeProto.isPresent()) {
350                 ensureSealed();
351
352                 switch (maybeProto.get()) {
353                     case ABORT:
354                         sendRequest(abortRequest(), callback);
355                         break;
356                     case SIMPLE:
357                         sendRequest(commitRequest(false), callback);
358                         break;
359                     case THREE_PHASE:
360                         sendRequest(commitRequest(true), callback);
361                         break;
362                     case READY:
363                         //no op
364                         break;
365                     default:
366                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
367                 }
368             }
369         } else if (request instanceof ReadTransactionRequest) {
370             ensureFlushedBuider();
371             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
372                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
373         } else if (request instanceof ExistsTransactionRequest) {
374             ensureFlushedBuider();
375             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
376                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
377         } else if (request instanceof TransactionPreCommitRequest) {
378             ensureFlushedBuider();
379             sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
380         } else if (request instanceof TransactionDoCommitRequest) {
381             ensureFlushedBuider();
382             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
383         } else if (request instanceof TransactionAbortRequest) {
384             ensureFlushedBuider();
385             sendAbort(callback);
386         } else if (request instanceof TransactionPurgeRequest) {
387             purge();
388         } else {
389             throw new IllegalArgumentException("Unhandled request {}" + request);
390         }
391     }
392
393     @Override
394     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
395             final Consumer<Response<?, ?>> callback) {
396         successor.handleForwardedRemoteRequest(request, callback);
397     }
398 }