8bdf3599463684ad86d1df361324faaf154e77da
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Verify.verify;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.Optional;
16 import java.util.function.Consumer;
17 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
18 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
24 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
25 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
26 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
27 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
28 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
29 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
34 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
35 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
39 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
40 import org.opendaylight.controller.cluster.access.concepts.RequestException;
41 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
42 import org.opendaylight.controller.cluster.access.concepts.Response;
43 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
44 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
45 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
46 import org.opendaylight.mdsal.common.api.ReadFailedException;
47 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
57  * not known or is known to be not co-located with the client.
58  *
59  * <p>
60  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
61  * maintaining any submitted operations until the leader is discovered.
62  *
63  * <p>
64  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
65  * transitions based on backend responses are thread-safe.
66  *
67  * @author Robert Varga
68  */
69 final class RemoteProxyTransaction extends AbstractProxyTransaction {
70     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
71
72     // FIXME: make this tuneable
73     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
74
75     private final ModifyTransactionRequestBuilder builder;
76     private final boolean sendReadyOnSeal;
77     private final boolean snapshotOnly;
78
79     private boolean builderBusy;
80
81     private volatile Exception operationFailure;
82
83     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
84             final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
85         super(parent, isDone);
86         this.snapshotOnly = snapshotOnly;
87         this.sendReadyOnSeal = sendReadyOnSeal;
88         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
89     }
90
91     @Override
92     boolean isSnapshotOnly() {
93         return snapshotOnly;
94     }
95
96     @Override
97     public TransactionIdentifier getIdentifier() {
98         return builder.getIdentifier();
99     }
100
101     @Override
102     void doDelete(final YangInstanceIdentifier path) {
103         appendModification(new TransactionDelete(path), Optional.empty());
104     }
105
106     @Override
107     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
108         appendModification(new TransactionMerge(path, data), Optional.empty());
109     }
110
111     @Override
112     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
113         appendModification(new TransactionWrite(path, data), Optional.empty());
114     }
115
116     private <T> FluentFuture<T> sendReadRequest(final AbstractReadTransactionRequest<?> request,
117             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
118         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
119         final Exception local = operationFailure;
120         if (local != null) {
121             return FluentFutures.immediateFailedFluentFuture(
122                     new ReadFailedException("Previous operation failed", local));
123         }
124
125         // Make sure we send any modifications before issuing a read
126         ensureFlushedBuider();
127         sendRequest(request, completer);
128         return FluentFuture.from(future);
129     }
130
131     @Override
132     FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
133         final SettableFuture<Boolean> future = SettableFuture.create();
134         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
135             isSnapshotOnly()), t -> completeExists(path, future, t), future);
136     }
137
138     @Override
139     FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(final YangInstanceIdentifier path) {
140         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
141         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
142             isSnapshotOnly()), t -> completeRead(path, future, t), future);
143     }
144
145     private void ensureInitializedBuilder() {
146         if (!builderBusy) {
147             builder.setSequence(nextSequence());
148             builderBusy = true;
149         }
150     }
151
152     private void ensureFlushedBuider() {
153         ensureFlushedBuider(Optional.empty());
154     }
155
156     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
157         if (builderBusy) {
158             flushBuilder(enqueuedTicks);
159         }
160     }
161
162     private void flushBuilder(final Optional<Long> enqueuedTicks) {
163         final ModifyTransactionRequest request = builder.build();
164         builderBusy = false;
165
166         sendModification(request, enqueuedTicks);
167     }
168
169     private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
170         if (enqueuedTicks.isPresent()) {
171             enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
172         } else {
173             sendRequest(request, response -> completeModify(request, response));
174         }
175     }
176
177     private void appendModification(final TransactionModification modification) {
178         appendModification(modification, Optional.empty());
179     }
180
181     private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
182         if (operationFailure == null) {
183             ensureInitializedBuilder();
184
185             builder.addModification(modification);
186             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
187                 flushBuilder(enqueuedTicks);
188             }
189         } else {
190             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
191         }
192     }
193
194     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
195         LOG.debug("Modification request {} completed with {}", request, response);
196
197         if (response instanceof TransactionSuccess) {
198             // Happy path
199             recordSuccessfulRequest(request);
200         } else {
201             recordFailedResponse(response);
202         }
203     }
204
205     private Exception recordFailedResponse(final Response<?, ?> response) {
206         final Exception failure;
207         if (response instanceof RequestFailure) {
208             final RequestException cause = ((RequestFailure<?, ?>) response).getCause();
209             failure = cause instanceof RequestTimeoutException
210                     ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause;
211         } else {
212             LOG.warn("Unhandled response {}", response);
213             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
214         }
215
216         if (operationFailure == null) {
217             LOG.debug("Transaction {} failed", getIdentifier(), failure);
218             operationFailure = failure;
219         }
220         return failure;
221     }
222
223     private void failReadFuture(final SettableFuture<?> future, final String message,
224             final Response<?, ?> response) {
225         future.setException(new ReadFailedException(message, recordFailedResponse(response)));
226     }
227
228     private void completeExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> future,
229             final Response<?, ?> response) {
230         LOG.debug("Exists request for {} completed with {}", path, response);
231
232         if (response instanceof ExistsTransactionSuccess) {
233             future.set(((ExistsTransactionSuccess) response).getExists());
234         } else {
235             failReadFuture(future, "Error executing exists request for path " + path, response);
236         }
237
238         recordFinishedRequest(response);
239     }
240
241     private void completeRead(final YangInstanceIdentifier path,
242             final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
243         LOG.debug("Read request for {} completed with {}", path, response);
244
245         if (response instanceof ReadTransactionSuccess) {
246             future.set(((ReadTransactionSuccess) response).getData());
247         } else {
248             failReadFuture(future, "Error reading data for path " + path, response);
249         }
250
251         recordFinishedRequest(response);
252     }
253
254     @Override
255     ModifyTransactionRequest abortRequest() {
256         ensureInitializedBuilder();
257         builder.setAbort();
258         builderBusy = false;
259         return builder.build();
260     }
261
262     @Override
263     ModifyTransactionRequest commitRequest(final boolean coordinated) {
264         ensureInitializedBuilder();
265         builder.setCommit(coordinated);
266         builderBusy = false;
267         return builder.build();
268     }
269
270     private ModifyTransactionRequest readyRequest() {
271         ensureInitializedBuilder();
272         builder.setReady();
273         builderBusy = false;
274         return builder.build();
275     }
276
277     @Override
278     boolean sealAndSend(final Optional<Long> enqueuedTicks) {
279         if (sendReadyOnSeal) {
280             ensureInitializedBuilder();
281             builder.setReady();
282             flushBuilder(enqueuedTicks);
283         }
284         return super.sealAndSend(enqueuedTicks);
285     }
286
287     @Override
288     Optional<ModifyTransactionRequest> flushState() {
289         if (!builderBusy) {
290             return Optional.empty();
291         }
292
293         final ModifyTransactionRequest request = builder.build();
294         builderBusy = false;
295         return Optional.of(request);
296     }
297
298     @Override
299     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
300             final Consumer<Response<?, ?>> callback) {
301         successor.handleForwardedRequest(request, callback);
302     }
303
304     void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
305         if (request instanceof ModifyTransactionRequest) {
306             handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
307         } else if (request instanceof ReadTransactionRequest) {
308             ensureFlushedBuider();
309             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
310                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
311                     recordFinishedRequest(resp);
312                     callback.accept(resp);
313                 });
314         } else if (request instanceof ExistsTransactionRequest) {
315             ensureFlushedBuider();
316             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
317                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
318                     recordFinishedRequest(resp);
319                     callback.accept(resp);
320                 });
321         } else if (request instanceof TransactionPreCommitRequest) {
322             ensureFlushedBuider();
323             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
324                 localActor());
325             sendRequest(tmp, resp -> {
326                 recordSuccessfulRequest(tmp);
327                 callback.accept(resp);
328             });
329         } else if (request instanceof TransactionDoCommitRequest) {
330             ensureFlushedBuider();
331             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
332         } else if (request instanceof TransactionAbortRequest) {
333             ensureFlushedBuider();
334             sendDoAbort(callback);
335         } else if (request instanceof TransactionPurgeRequest) {
336             enqueuePurge(callback);
337         } else {
338             throw new IllegalArgumentException("Unhandled request {}" + request);
339         }
340     }
341
342     private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
343             final ModifyTransactionRequest req) {
344         req.getModifications().forEach(this::appendModification);
345
346         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
347         if (maybeProto.isPresent()) {
348             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
349             // until we know what we are going to do.
350             if (markSealed()) {
351                 if (!sealOnly()) {
352                     LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this);
353                 }
354             }
355
356             final TransactionRequest<?> tmp;
357             switch (maybeProto.get()) {
358                 case ABORT:
359                     tmp = abortRequest();
360                     sendRequest(tmp, resp -> {
361                         completeModify(tmp, resp);
362                         callback.accept(resp);
363                     });
364                     break;
365                 case SIMPLE:
366                     tmp = commitRequest(false);
367                     sendRequest(tmp, resp -> {
368                         completeModify(tmp, resp);
369                         callback.accept(resp);
370                     });
371                     break;
372                 case THREE_PHASE:
373                     tmp = commitRequest(true);
374                     sendRequest(tmp, resp -> {
375                         recordSuccessfulRequest(tmp);
376                         callback.accept(resp);
377                     });
378                     break;
379                 case READY:
380                     tmp = readyRequest();
381                     sendRequest(tmp, resp -> {
382                         recordSuccessfulRequest(tmp);
383                         callback.accept(resp);
384                     });
385                     break;
386                 default:
387                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
388             }
389         }
390     }
391
392     @Override
393     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
394             final Consumer<Response<?, ?>> callback) {
395         successor.handleForwardedRemoteRequest(request, callback);
396     }
397
398     @Override
399     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
400             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
401         if (request instanceof CommitLocalTransactionRequest) {
402             replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
403         } else if (request instanceof AbortLocalTransactionRequest) {
404             enqueueRequest(abortRequest(), callback, enqueuedTicks);
405         } else {
406             throw new IllegalStateException("Unhandled request " + request);
407         }
408     }
409
410     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
411             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
412         final DataTreeModification mod = request.getModification();
413         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
414
415         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
416             @Override
417             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
418                 appendModification(new TransactionWrite(current().node(child), data), optTicks);
419             }
420
421             @Override
422             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
423                 appendModification(new TransactionMerge(current().node(child), data), optTicks);
424             }
425
426             @Override
427             public void delete(final PathArgument child) {
428                 appendModification(new TransactionDelete(current().node(child)), optTicks);
429             }
430         });
431
432         enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
433     }
434
435     @Override
436     void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
437             final long enqueuedTicks) {
438         final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
439         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
440
441         if (request instanceof ModifyTransactionRequest) {
442             handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
443         } else if (request instanceof ReadTransactionRequest) {
444             ensureFlushedBuider(optTicks);
445             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
446                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
447                     recordFinishedRequest(resp);
448                     cb.accept(resp);
449                 }, enqueuedTicks);
450         } else if (request instanceof ExistsTransactionRequest) {
451             ensureFlushedBuider(optTicks);
452             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
453                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
454                     recordFinishedRequest(resp);
455                     cb.accept(resp);
456                 }, enqueuedTicks);
457         } else if (request instanceof TransactionPreCommitRequest) {
458             ensureFlushedBuider(optTicks);
459             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
460                 localActor());
461             enqueueRequest(tmp, resp -> {
462                 recordSuccessfulRequest(tmp);
463                 cb.accept(resp);
464             }, enqueuedTicks);
465         } else if (request instanceof TransactionDoCommitRequest) {
466             ensureFlushedBuider(optTicks);
467             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
468                 enqueuedTicks);
469         } else if (request instanceof TransactionAbortRequest) {
470             ensureFlushedBuider(optTicks);
471             enqueueDoAbort(callback, enqueuedTicks);
472         } else if (request instanceof TransactionPurgeRequest) {
473             enqueuePurge(callback, enqueuedTicks);
474         } else if (request instanceof IncrementTransactionSequenceRequest) {
475             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
476             ensureFlushedBuider(optTicks);
477             enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
478                 snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
479             incrementSequence(req.getIncrement());
480         } else {
481             throw new IllegalArgumentException("Unhandled request {}" + request);
482         }
483     }
484
485     private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
486             final ModifyTransactionRequest req) {
487         req.getModifications().forEach(this::appendModification);
488
489         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
490         if (maybeProto.isPresent()) {
491             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
492             // until we know what we are going to do.
493             if (markSealed()) {
494                 verify(sealOnly(), "Attempted to replay seal on %s", this);
495             }
496
497             final TransactionRequest<?> tmp;
498             switch (maybeProto.get()) {
499                 case ABORT:
500                     tmp = abortRequest();
501                     enqueueRequest(tmp, resp -> {
502                         completeModify(tmp, resp);
503                         cb.accept(resp);
504                     }, enqueuedTicks);
505                     break;
506                 case SIMPLE:
507                     tmp = commitRequest(false);
508                     enqueueRequest(tmp, resp -> {
509                         completeModify(tmp, resp);
510                         cb.accept(resp);
511                     }, enqueuedTicks);
512                     break;
513                 case THREE_PHASE:
514                     tmp = commitRequest(true);
515                     enqueueRequest(tmp, resp -> {
516                         recordSuccessfulRequest(tmp);
517                         cb.accept(resp);
518                     }, enqueuedTicks);
519                     break;
520                 case READY:
521                     tmp = readyRequest();
522                     enqueueRequest(tmp, resp -> {
523                         recordSuccessfulRequest(tmp);
524                         cb.accept(resp);
525                     }, enqueuedTicks);
526                     break;
527                 default:
528                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
529             }
530         }
531     }
532 }