Do not assert seal transition on forward path
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Verify.verify;
11
12 import com.google.common.base.Function;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Optional;
17 import java.util.function.Consumer;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
25 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
26 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
28 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
29 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
30 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
41 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
42 import org.opendaylight.controller.cluster.access.concepts.Response;
43 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
44 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
45 import org.opendaylight.mdsal.common.api.ReadFailedException;
46 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
56  * not known or is known to be not co-located with the client.
57  *
58  * <p>
59  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
60  * maintaining any submitted operations until the leader is discovered.
61  *
62  * <p>
63  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
64  * transitions based on backend responses are thread-safe.
65  *
66  * @author Robert Varga
67  */
68 final class RemoteProxyTransaction extends AbstractProxyTransaction {
69     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
70
71     private static final Function<Exception, Exception> NOOP_EXCEPTION_MAPPER = ex -> ex;
72
73     // FIXME: make this tuneable
74     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
75
76     private final ModifyTransactionRequestBuilder builder;
77     private final boolean sendReadyOnSeal;
78     private final boolean snapshotOnly;
79
80     private boolean builderBusy;
81
82     private volatile Exception operationFailure;
83
84     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
85             final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
86         super(parent, isDone);
87         this.snapshotOnly = snapshotOnly;
88         this.sendReadyOnSeal = sendReadyOnSeal;
89         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
90     }
91
92     @Override
93     boolean isSnapshotOnly() {
94         return snapshotOnly;
95     }
96
97     @Override
98     public TransactionIdentifier getIdentifier() {
99         return builder.getIdentifier();
100     }
101
102     @Override
103     void doDelete(final YangInstanceIdentifier path) {
104         appendModification(new TransactionDelete(path), Optional.empty());
105     }
106
107     @Override
108     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
109         appendModification(new TransactionMerge(path, data), Optional.empty());
110     }
111
112     @Override
113     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
114         appendModification(new TransactionWrite(path, data), Optional.empty());
115     }
116
117     private <T> FluentFuture<T> sendReadRequest(final AbstractReadTransactionRequest<?> request,
118             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
119         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
120         final Exception local = operationFailure;
121         if (local != null) {
122             return FluentFutures.immediateFailedFluentFuture(
123                     new ReadFailedException("Previous operation failed", local));
124         }
125
126         // Make sure we send any modifications before issuing a read
127         ensureFlushedBuider();
128         sendRequest(request, completer);
129         return FluentFuture.from(future);
130     }
131
132     @Override
133     FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
134         final SettableFuture<Boolean> future = SettableFuture.create();
135         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
136             isSnapshotOnly()), t -> completeExists(future, t), future);
137     }
138
139     @Override
140     FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(final YangInstanceIdentifier path) {
141         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
142         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
143             isSnapshotOnly()), t -> completeRead(future, t), future);
144     }
145
146     private void ensureInitializedBuilder() {
147         if (!builderBusy) {
148             builder.setSequence(nextSequence());
149             builderBusy = true;
150         }
151     }
152
153     private void ensureFlushedBuider() {
154         ensureFlushedBuider(Optional.empty());
155     }
156
157     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
158         if (builderBusy) {
159             flushBuilder(enqueuedTicks);
160         }
161     }
162
163     private void flushBuilder(final Optional<Long> enqueuedTicks) {
164         final ModifyTransactionRequest request = builder.build();
165         builderBusy = false;
166
167         sendModification(request, enqueuedTicks);
168     }
169
170     private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
171         if (enqueuedTicks.isPresent()) {
172             enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
173         } else {
174             sendRequest(request, response -> completeModify(request, response));
175         }
176     }
177
178     private void appendModification(final TransactionModification modification) {
179         appendModification(modification, Optional.empty());
180     }
181
182     private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
183         if (operationFailure == null) {
184             ensureInitializedBuilder();
185
186             builder.addModification(modification);
187             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
188                 flushBuilder(enqueuedTicks);
189             }
190         } else {
191             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
192         }
193     }
194
195     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
196         LOG.debug("Modification request {} completed with {}", request, response);
197
198         if (response instanceof TransactionSuccess) {
199             // Happy path
200             recordSuccessfulRequest(request);
201         } else {
202             recordFailedResponse(response, NOOP_EXCEPTION_MAPPER);
203         }
204     }
205
206     private <X extends Exception> X recordFailedResponse(final Response<?, ?> response,
207             final Function<Exception, X> exMapper) {
208         final Exception failure;
209         if (response instanceof RequestFailure) {
210             failure = ((RequestFailure<?, ?>) response).getCause();
211         } else {
212             LOG.warn("Unhandled response {}", response);
213             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
214         }
215
216         if (operationFailure == null) {
217             LOG.debug("Transaction {} failed", getIdentifier(), failure);
218             operationFailure = failure;
219         }
220         return exMapper.apply(failure);
221     }
222
223     private void failReadFuture(final SettableFuture<?> future, final Response<?, ?> response) {
224         future.setException(recordFailedResponse(response, ReadFailedException.MAPPER));
225     }
226
227     private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
228         LOG.debug("Exists request completed with {}", response);
229
230         if (response instanceof ExistsTransactionSuccess) {
231             future.set(((ExistsTransactionSuccess) response).getExists());
232         } else {
233             failReadFuture(future, response);
234         }
235
236         recordFinishedRequest(response);
237     }
238
239     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
240             final Response<?, ?> response) {
241         LOG.debug("Read request completed with {}", response);
242
243         if (response instanceof ReadTransactionSuccess) {
244             future.set(((ReadTransactionSuccess) response).getData());
245         } else {
246             failReadFuture(future, response);
247         }
248
249         recordFinishedRequest(response);
250     }
251
252     @Override
253     ModifyTransactionRequest abortRequest() {
254         ensureInitializedBuilder();
255         builder.setAbort();
256         builderBusy = false;
257         return builder.build();
258     }
259
260     @Override
261     ModifyTransactionRequest commitRequest(final boolean coordinated) {
262         ensureInitializedBuilder();
263         builder.setCommit(coordinated);
264         builderBusy = false;
265         return builder.build();
266     }
267
268     private ModifyTransactionRequest readyRequest() {
269         ensureInitializedBuilder();
270         builder.setReady();
271         builderBusy = false;
272         return builder.build();
273     }
274
275     @Override
276     boolean sealAndSend(final Optional<Long> enqueuedTicks) {
277         if (sendReadyOnSeal) {
278             ensureInitializedBuilder();
279             builder.setReady();
280             flushBuilder(enqueuedTicks);
281         }
282         return super.sealAndSend(enqueuedTicks);
283     }
284
285     @Override
286     java.util.Optional<ModifyTransactionRequest> flushState() {
287         if (!builderBusy) {
288             return java.util.Optional.empty();
289         }
290
291         final ModifyTransactionRequest request = builder.build();
292         builderBusy = false;
293         return java.util.Optional.of(request);
294     }
295
296     @Override
297     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
298             final Consumer<Response<?, ?>> callback) {
299         successor.handleForwardedRequest(request, callback);
300     }
301
302     void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
303         if (request instanceof ModifyTransactionRequest) {
304             handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
305         } else if (request instanceof ReadTransactionRequest) {
306             ensureFlushedBuider();
307             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
308                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
309                     recordFinishedRequest(resp);
310                     callback.accept(resp);
311                 });
312         } else if (request instanceof ExistsTransactionRequest) {
313             ensureFlushedBuider();
314             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
315                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
316                     recordFinishedRequest(resp);
317                     callback.accept(resp);
318                 });
319         } else if (request instanceof TransactionPreCommitRequest) {
320             ensureFlushedBuider();
321             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
322                 localActor());
323             sendRequest(tmp, resp -> {
324                 recordSuccessfulRequest(tmp);
325                 callback.accept(resp);
326             });
327         } else if (request instanceof TransactionDoCommitRequest) {
328             ensureFlushedBuider();
329             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
330         } else if (request instanceof TransactionAbortRequest) {
331             ensureFlushedBuider();
332             sendDoAbort(callback);
333         } else if (request instanceof TransactionPurgeRequest) {
334             enqueuePurge(callback);
335         } else {
336             throw new IllegalArgumentException("Unhandled request {}" + request);
337         }
338     }
339
340     private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
341             final ModifyTransactionRequest req) {
342         req.getModifications().forEach(this::appendModification);
343
344         final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
345         if (maybeProto.isPresent()) {
346             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
347             // until we know what we are going to do.
348             if (markSealed()) {
349                 if (!sealOnly()) {
350                     LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this);
351                 }
352             }
353
354             final TransactionRequest<?> tmp;
355             switch (maybeProto.get()) {
356                 case ABORT:
357                     tmp = abortRequest();
358                     sendRequest(tmp, resp -> {
359                         completeModify(tmp, resp);
360                         callback.accept(resp);
361                     });
362                     break;
363                 case SIMPLE:
364                     tmp = commitRequest(false);
365                     sendRequest(tmp, resp -> {
366                         completeModify(tmp, resp);
367                         callback.accept(resp);
368                     });
369                     break;
370                 case THREE_PHASE:
371                     tmp = commitRequest(true);
372                     sendRequest(tmp, resp -> {
373                         recordSuccessfulRequest(tmp);
374                         callback.accept(resp);
375                     });
376                     break;
377                 case READY:
378                     tmp = readyRequest();
379                     sendRequest(tmp, resp -> {
380                         recordSuccessfulRequest(tmp);
381                         callback.accept(resp);
382                     });
383                     break;
384                 default:
385                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
386             }
387         }
388     }
389
390     @Override
391     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
392             final Consumer<Response<?, ?>> callback) {
393         successor.handleForwardedRemoteRequest(request, callback);
394     }
395
396     @Override
397     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
398             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
399         if (request instanceof CommitLocalTransactionRequest) {
400             replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
401         } else if (request instanceof AbortLocalTransactionRequest) {
402             enqueueRequest(abortRequest(), callback, enqueuedTicks);
403         } else {
404             throw new IllegalStateException("Unhandled request " + request);
405         }
406     }
407
408     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
409             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
410         final DataTreeModification mod = request.getModification();
411         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
412
413         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
414             @Override
415             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
416                 appendModification(new TransactionWrite(current().node(child), data), optTicks);
417             }
418
419             @Override
420             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
421                 appendModification(new TransactionMerge(current().node(child), data), optTicks);
422             }
423
424             @Override
425             public void delete(final PathArgument child) {
426                 appendModification(new TransactionDelete(current().node(child)), optTicks);
427             }
428         });
429
430         enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
431     }
432
433     @Override
434     void handleReplayedRemoteRequest(final TransactionRequest<?> request,
435             @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
436         final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
437         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
438
439         if (request instanceof ModifyTransactionRequest) {
440             handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
441         } else if (request instanceof ReadTransactionRequest) {
442             ensureFlushedBuider(optTicks);
443             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
444                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
445                     recordFinishedRequest(resp);
446                     cb.accept(resp);
447                 }, enqueuedTicks);
448         } else if (request instanceof ExistsTransactionRequest) {
449             ensureFlushedBuider(optTicks);
450             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
451                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
452                     recordFinishedRequest(resp);
453                     cb.accept(resp);
454                 }, enqueuedTicks);
455         } else if (request instanceof TransactionPreCommitRequest) {
456             ensureFlushedBuider(optTicks);
457             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
458                 localActor());
459             enqueueRequest(tmp, resp -> {
460                 recordSuccessfulRequest(tmp);
461                 cb.accept(resp);
462             }, enqueuedTicks);
463         } else if (request instanceof TransactionDoCommitRequest) {
464             ensureFlushedBuider(optTicks);
465             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
466                 enqueuedTicks);
467         } else if (request instanceof TransactionAbortRequest) {
468             ensureFlushedBuider(optTicks);
469             enqueueDoAbort(callback, enqueuedTicks);
470         } else if (request instanceof TransactionPurgeRequest) {
471             enqueuePurge(callback, enqueuedTicks);
472         } else if (request instanceof IncrementTransactionSequenceRequest) {
473             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
474             ensureFlushedBuider(optTicks);
475             enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
476                 snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
477             incrementSequence(req.getIncrement());
478         } else {
479             throw new IllegalArgumentException("Unhandled request {}" + request);
480         }
481     }
482
483     private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
484             final ModifyTransactionRequest req) {
485         req.getModifications().forEach(this::appendModification);
486
487         final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
488         if (maybeProto.isPresent()) {
489             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
490             // until we know what we are going to do.
491             if (markSealed()) {
492                 verify(sealOnly(), "Attempted to replay seal on %s", this);
493             }
494
495             final TransactionRequest<?> tmp;
496             switch (maybeProto.get()) {
497                 case ABORT:
498                     tmp = abortRequest();
499                     enqueueRequest(tmp, resp -> {
500                         completeModify(tmp, resp);
501                         cb.accept(resp);
502                     }, enqueuedTicks);
503                     break;
504                 case SIMPLE:
505                     tmp = commitRequest(false);
506                     enqueueRequest(tmp, resp -> {
507                         completeModify(tmp, resp);
508                         cb.accept(resp);
509                     }, enqueuedTicks);
510                     break;
511                 case THREE_PHASE:
512                     tmp = commitRequest(true);
513                     enqueueRequest(tmp, resp -> {
514                         recordSuccessfulRequest(tmp);
515                         cb.accept(resp);
516                     }, enqueuedTicks);
517                     break;
518                 case READY:
519                     tmp = readyRequest();
520                     enqueueRequest(tmp, resp -> {
521                         recordSuccessfulRequest(tmp);
522                         cb.accept(resp);
523                     }, enqueuedTicks);
524                     break;
525                 default:
526                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
527             }
528         }
529     }
530 }