d0d7946f62451cca0f6c34750166ef692cbc272d
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Verify.verify;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.Optional;
16 import java.util.OptionalLong;
17 import java.util.function.Consumer;
18 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
25 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
26 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
28 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
29 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
30 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
41 import org.opendaylight.controller.cluster.access.concepts.RequestException;
42 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
43 import org.opendaylight.controller.cluster.access.concepts.Response;
44 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
45 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
46 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
47 import org.opendaylight.mdsal.common.api.ReadFailedException;
48 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
58  * not known or is known to be not co-located with the client.
59  *
60  * <p>
61  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
62  * maintaining any submitted operations until the leader is discovered.
63  *
64  * <p>
65  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
66  * transitions based on backend responses are thread-safe.
67  *
68  * @author Robert Varga
69  */
70 final class RemoteProxyTransaction extends AbstractProxyTransaction {
71     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
72
73     // FIXME: make this tuneable
74     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
75
76     private final ModifyTransactionRequestBuilder builder;
77     private final boolean sendReadyOnSeal;
78     private final boolean snapshotOnly;
79
80     private boolean builderBusy;
81
82     private volatile Exception operationFailure;
83
84     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
85             final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
86         super(parent, isDone);
87         this.snapshotOnly = snapshotOnly;
88         this.sendReadyOnSeal = sendReadyOnSeal;
89         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
90     }
91
92     @Override
93     boolean isSnapshotOnly() {
94         return snapshotOnly;
95     }
96
97     @Override
98     public TransactionIdentifier getIdentifier() {
99         return builder.getIdentifier();
100     }
101
102     @Override
103     void doDelete(final YangInstanceIdentifier path) {
104         appendModification(new TransactionDelete(path), OptionalLong.empty());
105     }
106
107     @Override
108     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
109         appendModification(new TransactionMerge(path, data), OptionalLong.empty());
110     }
111
112     @Override
113     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
114         appendModification(new TransactionWrite(path, data), OptionalLong.empty());
115     }
116
117     private <T> FluentFuture<T> sendReadRequest(final AbstractReadTransactionRequest<?> request,
118             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
119         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
120         final Exception local = operationFailure;
121         if (local != null) {
122             return FluentFutures.immediateFailedFluentFuture(
123                     new ReadFailedException("Previous operation failed", local));
124         }
125
126         // Make sure we send any modifications before issuing a read
127         ensureFlushedBuider();
128         sendRequest(request, completer);
129         return FluentFuture.from(future);
130     }
131
132     @Override
133     FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
134         final SettableFuture<Boolean> future = SettableFuture.create();
135         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
136             isSnapshotOnly()), t -> completeExists(path, future, t), future);
137     }
138
139     @Override
140     FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(final YangInstanceIdentifier path) {
141         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
142         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
143             isSnapshotOnly()), t -> completeRead(path, future, t), future);
144     }
145
146     private void ensureInitializedBuilder() {
147         if (!builderBusy) {
148             builder.setSequence(nextSequence());
149             builderBusy = true;
150         }
151     }
152
153     private void ensureFlushedBuider() {
154         ensureFlushedBuider(OptionalLong.empty());
155     }
156
157     private void ensureFlushedBuider(final OptionalLong enqueuedTicks) {
158         if (builderBusy) {
159             flushBuilder(enqueuedTicks);
160         }
161     }
162
163     private void flushBuilder(final OptionalLong enqueuedTicks) {
164         final ModifyTransactionRequest request = builder.build();
165         builderBusy = false;
166
167         sendModification(request, enqueuedTicks);
168     }
169
170     private void sendModification(final TransactionRequest<?> request, final OptionalLong enqueuedTicks) {
171         if (enqueuedTicks.isPresent()) {
172             enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.getAsLong());
173         } else {
174             sendRequest(request, response -> completeModify(request, response));
175         }
176     }
177
178     private void appendModification(final TransactionModification modification) {
179         appendModification(modification, OptionalLong.empty());
180     }
181
182     private void appendModification(final TransactionModification modification, final OptionalLong enqueuedTicks) {
183         if (operationFailure == null) {
184             ensureInitializedBuilder();
185
186             builder.addModification(modification);
187             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
188                 flushBuilder(enqueuedTicks);
189             }
190         } else {
191             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
192         }
193     }
194
195     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
196         LOG.debug("Modification request {} completed with {}", request, response);
197
198         if (response instanceof TransactionSuccess) {
199             // Happy path
200             recordSuccessfulRequest(request);
201         } else {
202             recordFailedResponse(response);
203         }
204     }
205
206     private Exception recordFailedResponse(final Response<?, ?> response) {
207         final Exception failure;
208         if (response instanceof RequestFailure) {
209             final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
210             failure = cause instanceof RequestTimeoutException
211                     ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause;
212         } else {
213             LOG.warn("Unhandled response {}", response);
214             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
215         }
216
217         if (operationFailure == null) {
218             LOG.debug("Transaction {} failed", getIdentifier(), failure);
219             operationFailure = failure;
220         }
221         return failure;
222     }
223
224     private void failReadFuture(final SettableFuture<?> future, final String message,
225             final Response<?, ?> response) {
226         future.setException(new ReadFailedException(message, recordFailedResponse(response)));
227     }
228
229     private void completeExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> future,
230             final Response<?, ?> response) {
231         LOG.debug("Exists request for {} completed with {}", path, response);
232
233         if (response instanceof ExistsTransactionSuccess) {
234             future.set(((ExistsTransactionSuccess) response).getExists());
235         } else {
236             failReadFuture(future, "Error executing exists request for path " + path, response);
237         }
238
239         recordFinishedRequest(response);
240     }
241
242     private void completeRead(final YangInstanceIdentifier path,
243             final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
244         LOG.debug("Read request for {} completed with {}", path, response);
245
246         if (response instanceof ReadTransactionSuccess) {
247             future.set(((ReadTransactionSuccess) response).getData());
248         } else {
249             failReadFuture(future, "Error reading data for path " + path, response);
250         }
251
252         recordFinishedRequest(response);
253     }
254
255     @Override
256     ModifyTransactionRequest abortRequest() {
257         ensureInitializedBuilder();
258         builder.setAbort();
259         builderBusy = false;
260         return builder.build();
261     }
262
263     @Override
264     ModifyTransactionRequest commitRequest(final boolean coordinated) {
265         ensureInitializedBuilder();
266         builder.setCommit(coordinated);
267         builderBusy = false;
268         return builder.build();
269     }
270
271     private ModifyTransactionRequest readyRequest() {
272         ensureInitializedBuilder();
273         builder.setReady();
274         builderBusy = false;
275         return builder.build();
276     }
277
278     @Override
279     boolean sealAndSend(final OptionalLong enqueuedTicks) {
280         if (sendReadyOnSeal) {
281             ensureInitializedBuilder();
282             builder.setReady();
283             flushBuilder(enqueuedTicks);
284         }
285         return super.sealAndSend(enqueuedTicks);
286     }
287
288     @Override
289     Optional<ModifyTransactionRequest> flushState() {
290         if (!builderBusy) {
291             return Optional.empty();
292         }
293
294         final ModifyTransactionRequest request = builder.build();
295         builderBusy = false;
296         return Optional.of(request);
297     }
298
299     @Override
300     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
301             final Consumer<Response<?, ?>> callback) {
302         successor.handleForwardedRequest(request, callback);
303     }
304
305     void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
306         if (request instanceof ModifyTransactionRequest) {
307             handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
308         } else if (request instanceof ReadTransactionRequest) {
309             ensureFlushedBuider();
310             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
311                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
312                     recordFinishedRequest(resp);
313                     callback.accept(resp);
314                 });
315         } else if (request instanceof ExistsTransactionRequest) {
316             ensureFlushedBuider();
317             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
318                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
319                     recordFinishedRequest(resp);
320                     callback.accept(resp);
321                 });
322         } else if (request instanceof TransactionPreCommitRequest) {
323             ensureFlushedBuider();
324             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
325                 localActor());
326             sendRequest(tmp, resp -> {
327                 recordSuccessfulRequest(tmp);
328                 callback.accept(resp);
329             });
330         } else if (request instanceof TransactionDoCommitRequest) {
331             ensureFlushedBuider();
332             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
333         } else if (request instanceof TransactionAbortRequest) {
334             ensureFlushedBuider();
335             sendDoAbort(callback);
336         } else if (request instanceof TransactionPurgeRequest) {
337             enqueuePurge(callback);
338         } else {
339             throw new IllegalArgumentException("Unhandled request {}" + request);
340         }
341     }
342
343     private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
344             final ModifyTransactionRequest req) {
345         req.getModifications().forEach(this::appendModification);
346
347         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
348         if (maybeProto.isPresent()) {
349             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
350             // until we know what we are going to do.
351             if (markSealed()) {
352                 if (!sealOnly()) {
353                     LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this);
354                 }
355             }
356
357             final TransactionRequest<?> tmp;
358             switch (maybeProto.get()) {
359                 case ABORT:
360                     tmp = abortRequest();
361                     sendRequest(tmp, resp -> {
362                         completeModify(tmp, resp);
363                         callback.accept(resp);
364                     });
365                     break;
366                 case SIMPLE:
367                     tmp = commitRequest(false);
368                     sendRequest(tmp, resp -> {
369                         completeModify(tmp, resp);
370                         callback.accept(resp);
371                     });
372                     break;
373                 case THREE_PHASE:
374                     tmp = commitRequest(true);
375                     sendRequest(tmp, resp -> {
376                         recordSuccessfulRequest(tmp);
377                         callback.accept(resp);
378                     });
379                     break;
380                 case READY:
381                     tmp = readyRequest();
382                     sendRequest(tmp, resp -> {
383                         recordSuccessfulRequest(tmp);
384                         callback.accept(resp);
385                     });
386                     break;
387                 default:
388                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
389             }
390         }
391     }
392
393     @Override
394     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
395             final Consumer<Response<?, ?>> callback) {
396         successor.handleForwardedRemoteRequest(request, callback);
397     }
398
399     @Override
400     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
401             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
402         if (request instanceof CommitLocalTransactionRequest) {
403             replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
404         } else if (request instanceof AbortLocalTransactionRequest) {
405             enqueueRequest(abortRequest(), callback, enqueuedTicks);
406         } else {
407             throw new IllegalStateException("Unhandled request " + request);
408         }
409     }
410
411     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
412             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
413         final DataTreeModification mod = request.getModification();
414         final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
415
416         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
417             @Override
418             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
419                 appendModification(new TransactionWrite(current().node(child), data), optTicks);
420             }
421
422             @Override
423             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
424                 appendModification(new TransactionMerge(current().node(child), data), optTicks);
425             }
426
427             @Override
428             public void delete(final PathArgument child) {
429                 appendModification(new TransactionDelete(current().node(child)), optTicks);
430             }
431         });
432
433         enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
434     }
435
436     @Override
437     void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
438             final long enqueuedTicks) {
439         final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
440         final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
441
442         if (request instanceof ModifyTransactionRequest) {
443             handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
444         } else if (request instanceof ReadTransactionRequest) {
445             ensureFlushedBuider(optTicks);
446             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
447                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
448                     recordFinishedRequest(resp);
449                     cb.accept(resp);
450                 }, enqueuedTicks);
451         } else if (request instanceof ExistsTransactionRequest) {
452             ensureFlushedBuider(optTicks);
453             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
454                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
455                     recordFinishedRequest(resp);
456                     cb.accept(resp);
457                 }, enqueuedTicks);
458         } else if (request instanceof TransactionPreCommitRequest) {
459             ensureFlushedBuider(optTicks);
460             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
461                 localActor());
462             enqueueRequest(tmp, resp -> {
463                 recordSuccessfulRequest(tmp);
464                 cb.accept(resp);
465             }, enqueuedTicks);
466         } else if (request instanceof TransactionDoCommitRequest) {
467             ensureFlushedBuider(optTicks);
468             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
469                 enqueuedTicks);
470         } else if (request instanceof TransactionAbortRequest) {
471             ensureFlushedBuider(optTicks);
472             enqueueDoAbort(callback, enqueuedTicks);
473         } else if (request instanceof TransactionPurgeRequest) {
474             enqueuePurge(callback, enqueuedTicks);
475         } else if (request instanceof IncrementTransactionSequenceRequest) {
476             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
477             ensureFlushedBuider(optTicks);
478             enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
479                 snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
480             incrementSequence(req.getIncrement());
481         } else {
482             throw new IllegalArgumentException("Unhandled request {}" + request);
483         }
484     }
485
486     private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
487             final ModifyTransactionRequest req) {
488         req.getModifications().forEach(this::appendModification);
489
490         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
491         if (maybeProto.isPresent()) {
492             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
493             // until we know what we are going to do.
494             if (markSealed()) {
495                 verify(sealOnly(), "Attempted to replay seal on %s", this);
496             }
497
498             final TransactionRequest<?> tmp;
499             switch (maybeProto.get()) {
500                 case ABORT:
501                     tmp = abortRequest();
502                     enqueueRequest(tmp, resp -> {
503                         completeModify(tmp, resp);
504                         cb.accept(resp);
505                     }, enqueuedTicks);
506                     break;
507                 case SIMPLE:
508                     tmp = commitRequest(false);
509                     enqueueRequest(tmp, resp -> {
510                         completeModify(tmp, resp);
511                         cb.accept(resp);
512                     }, enqueuedTicks);
513                     break;
514                 case THREE_PHASE:
515                     tmp = commitRequest(true);
516                     enqueueRequest(tmp, resp -> {
517                         recordSuccessfulRequest(tmp);
518                         cb.accept(resp);
519                     }, enqueuedTicks);
520                     break;
521                 case READY:
522                     tmp = readyRequest();
523                     enqueueRequest(tmp, resp -> {
524                         recordSuccessfulRequest(tmp);
525                         cb.accept(resp);
526                     }, enqueuedTicks);
527                     break;
528                 default:
529                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
530             }
531         }
532     }
533 }