BUG-8422: Propagate enqueue time
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.function.Consumer;
16 import javax.annotation.Nullable;
17 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
18 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
23 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
25 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
26 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
29 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
30 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
32 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
33 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
37 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
38 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
39 import org.opendaylight.controller.cluster.access.concepts.Response;
40 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
41 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
42 import org.opendaylight.mdsal.common.api.ReadFailedException;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
53  * not known or is known to be not co-located with the client.
54  *
55  * <p>
56  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
57  * maintaining any submitted operations until the leader is discovered.
58  *
59  * <p>
60  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
61  * transitions based on backend responses are thread-safe.
62  *
63  * @author Robert Varga
64  */
65 final class RemoteProxyTransaction extends AbstractProxyTransaction {
66     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
67
68     // FIXME: make this tuneable
69     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
70
71     private final ModifyTransactionRequestBuilder builder;
72     private final boolean sendReadyOnSeal;
73     private final boolean snapshotOnly;
74
75     private boolean builderBusy;
76
77     private volatile Exception operationFailure;
78
79     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
80             final boolean snapshotOnly, final boolean sendReadyOnSeal) {
81         super(parent);
82         this.snapshotOnly = snapshotOnly;
83         this.sendReadyOnSeal = sendReadyOnSeal;
84         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
85     }
86
87     @Override
88     boolean isSnapshotOnly() {
89         return snapshotOnly;
90     }
91
92     @Override
93     public TransactionIdentifier getIdentifier() {
94         return builder.getIdentifier();
95     }
96
97     @Override
98     void doDelete(final YangInstanceIdentifier path) {
99         appendModification(new TransactionDelete(path), Optional.absent());
100     }
101
102     @Override
103     void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
104         appendModification(new TransactionMerge(path, data), Optional.absent());
105     }
106
107     @Override
108     void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
109         appendModification(new TransactionWrite(path, data), Optional.absent());
110     }
111
112     private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
113             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
114         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
115         final Exception local = operationFailure;
116         if (local != null) {
117             return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local));
118         }
119
120         // Make sure we send any modifications before issuing a read
121         ensureFlushedBuider();
122         sendRequest(request, completer);
123         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
124     }
125
126     @Override
127     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
128         final SettableFuture<Boolean> future = SettableFuture.create();
129         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
130             isSnapshotOnly()), t -> completeExists(future, t), future);
131     }
132
133     @Override
134     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
135         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
136         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
137             isSnapshotOnly()), t -> completeRead(future, t), future);
138     }
139
140     @Override
141     void doAbort() {
142         ensureInitializedBuilder();
143         builder.setAbort();
144         flushBuilder();
145     }
146
147     private void ensureInitializedBuilder() {
148         if (!builderBusy) {
149             builder.setSequence(nextSequence());
150             builderBusy = true;
151         }
152     }
153
154     private void ensureFlushedBuider() {
155         if (builderBusy) {
156             flushBuilder();
157         }
158     }
159
160     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
161         if (builderBusy) {
162             flushBuilder(enqueuedTicks);
163         }
164     }
165
166     private void flushBuilder() {
167         flushBuilder(Optional.absent());
168     }
169
170     private void flushBuilder(final Optional<Long> enqueuedTicks) {
171         final ModifyTransactionRequest request = builder.build();
172         builderBusy = false;
173
174         sendModification(request, enqueuedTicks);
175     }
176
177     private void sendModification(final TransactionRequest<?> request, final Optional<Long> enqueuedTicks) {
178         if (enqueuedTicks.isPresent()) {
179             enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue());
180         } else {
181             sendRequest(request, response -> completeModify(request, response));
182         }
183     }
184
185     private void appendModification(final TransactionModification modification) {
186         appendModification(modification, Optional.absent());
187     }
188
189     private void appendModification(final TransactionModification modification, final Optional<Long> enqueuedTicks) {
190         if (operationFailure == null) {
191             ensureInitializedBuilder();
192
193             builder.addModification(modification);
194             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
195                 flushBuilder(enqueuedTicks);
196             }
197         } else {
198             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
199         }
200     }
201
202     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
203         LOG.debug("Modification request {} completed with {}", request, response);
204
205         if (response instanceof TransactionSuccess) {
206             // Happy path
207             recordSuccessfulRequest(request);
208         } else {
209             recordFailedResponse(response);
210         }
211     }
212
213     private Exception recordFailedResponse(final Response<?, ?> response) {
214         final Exception failure;
215         if (response instanceof RequestFailure) {
216             failure = ((RequestFailure<?, ?>) response).getCause();
217         } else {
218             LOG.warn("Unhandled response {}", response);
219             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
220         }
221
222         if (operationFailure == null) {
223             LOG.debug("Transaction {} failed", getIdentifier(), failure);
224             operationFailure = failure;
225         }
226         return failure;
227     }
228
229     private void failFuture(final SettableFuture<?> future, final Response<?, ?> response) {
230         future.setException(recordFailedResponse(response));
231     }
232
233     private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
234         LOG.debug("Exists request completed with {}", response);
235
236         if (response instanceof ExistsTransactionSuccess) {
237             future.set(((ExistsTransactionSuccess) response).getExists());
238         } else {
239             failFuture(future, response);
240         }
241
242         recordFinishedRequest();
243     }
244
245     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
246             final Response<?, ?> response) {
247         LOG.debug("Read request completed with {}", response);
248
249         if (response instanceof ReadTransactionSuccess) {
250             future.set(((ReadTransactionSuccess) response).getData());
251         } else {
252             failFuture(future, response);
253         }
254
255         recordFinishedRequest();
256     }
257
258     private ModifyTransactionRequest abortRequest() {
259         ensureInitializedBuilder();
260         builder.setAbort();
261         final ModifyTransactionRequest ret = builder.build();
262         builderBusy = false;
263         return ret;
264     }
265
266     @Override
267     ModifyTransactionRequest commitRequest(final boolean coordinated) {
268         ensureInitializedBuilder();
269         builder.setCommit(coordinated);
270
271         final ModifyTransactionRequest ret = builder.build();
272         builderBusy = false;
273         return ret;
274     }
275
276     @Override
277     void doSeal() {
278         if (sendReadyOnSeal) {
279             ensureInitializedBuilder();
280             builder.setReady();
281             flushBuilder();
282         }
283     }
284
285     @Override
286     void flushState(final AbstractProxyTransaction successor) {
287         if (builderBusy) {
288             final ModifyTransactionRequest request = builder.build();
289             builderBusy = false;
290             forwardToSuccessor(successor, request, null);
291         }
292     }
293
294     @Override
295     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
296             final Consumer<Response<?, ?>> callback) {
297         successor.handleForwardedRequest(request, callback);
298     }
299
300     private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
301         if (request instanceof ModifyTransactionRequest) {
302             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
303
304             req.getModifications().forEach(this::appendModification);
305
306             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
307             if (maybeProto.isPresent()) {
308                 ensureSealed();
309
310                 final TransactionRequest<?> tmp;
311                 switch (maybeProto.get()) {
312                     case ABORT:
313                         tmp = abortRequest();
314                         sendRequest(tmp, resp -> {
315                             completeModify(tmp, resp);
316                             callback.accept(resp);
317                         });
318                         break;
319                     case SIMPLE:
320                         tmp = commitRequest(false);
321                         sendRequest(tmp, resp -> {
322                             completeModify(tmp, resp);
323                             callback.accept(resp);
324                         });
325                         break;
326                     case THREE_PHASE:
327                         tmp = commitRequest(true);
328                         sendRequest(tmp, resp -> {
329                             recordSuccessfulRequest(tmp);
330                             callback.accept(resp);
331                         });
332                         break;
333                     case READY:
334                         //no op
335                         break;
336                     default:
337                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
338                 }
339             }
340         } else if (request instanceof ReadTransactionRequest) {
341             ensureFlushedBuider();
342             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
343                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
344                     recordFinishedRequest();
345                     callback.accept(resp);
346                 });
347         } else if (request instanceof ExistsTransactionRequest) {
348             ensureFlushedBuider();
349             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
350                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
351                     recordFinishedRequest();
352                     callback.accept(resp);
353                 });
354         } else if (request instanceof TransactionPreCommitRequest) {
355             ensureFlushedBuider();
356             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
357                 localActor());
358             sendRequest(tmp, resp -> {
359                 recordSuccessfulRequest(tmp);
360                 callback.accept(resp);
361             });
362         } else if (request instanceof TransactionDoCommitRequest) {
363             ensureFlushedBuider();
364             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
365         } else if (request instanceof TransactionAbortRequest) {
366             ensureFlushedBuider();
367             sendAbort(callback);
368         } else if (request instanceof TransactionPurgeRequest) {
369             sendPurge();
370         } else {
371             throw new IllegalArgumentException("Unhandled request {}" + request);
372         }
373     }
374
375     @Override
376     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
377             final Consumer<Response<?, ?>> callback) {
378         successor.handleForwardedRemoteRequest(request, callback);
379     }
380
381     @Override
382     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
383             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
384         if (request instanceof CommitLocalTransactionRequest) {
385             replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks);
386         } else if (request instanceof AbortLocalTransactionRequest) {
387             enqueueRequest(abortRequest(), callback, enqueuedTicks);
388         } else {
389             throw new IllegalStateException("Unhandled request " + request);
390         }
391     }
392
393     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
394             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
395         final DataTreeModification mod = request.getModification();
396         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
397
398         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
399             @Override
400             public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
401                 appendModification(new TransactionWrite(current().node(child), data), optTicks);
402             }
403
404             @Override
405             public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
406                 appendModification(new TransactionMerge(current().node(child), data), optTicks);
407             }
408
409             @Override
410             public void delete(final PathArgument child) {
411                 appendModification(new TransactionDelete(current().node(child)), optTicks);
412             }
413         });
414
415         enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
416     }
417
418     @Override
419     void handleReplayedRemoteRequest(final TransactionRequest<?> request,
420             final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
421         final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
422         final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
423
424         if (request instanceof ModifyTransactionRequest) {
425             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
426             for (TransactionModification mod : req.getModifications()) {
427                 appendModification(mod, optTicks);
428             }
429
430             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
431             if (maybeProto.isPresent()) {
432                 ensureSealed();
433
434                 final TransactionRequest<?> tmp;
435                 switch (maybeProto.get()) {
436                     case ABORT:
437                         tmp = abortRequest();
438                         enqueueRequest(tmp, resp -> {
439                             completeModify(tmp, resp);
440                             cb.accept(resp);
441                         }, enqueuedTicks);
442                         break;
443                     case SIMPLE:
444                         tmp = commitRequest(false);
445                         enqueueRequest(tmp, resp -> {
446                             completeModify(tmp, resp);
447                             cb.accept(resp);
448                         }, enqueuedTicks);
449                         break;
450                     case THREE_PHASE:
451                         tmp = commitRequest(true);
452                         enqueueRequest(tmp, resp -> {
453                             recordSuccessfulRequest(tmp);
454                             cb.accept(resp);
455                         }, enqueuedTicks);
456                         break;
457                     case READY:
458                         //no op
459                         break;
460                     default:
461                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
462                 }
463             }
464         } else if (request instanceof ReadTransactionRequest) {
465             ensureFlushedBuider(optTicks);
466             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
467                 ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
468                     recordFinishedRequest();
469                     cb.accept(resp);
470                 }, enqueuedTicks);
471         } else if (request instanceof ExistsTransactionRequest) {
472             ensureFlushedBuider(optTicks);
473             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
474                 ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> {
475                     recordFinishedRequest();
476                     cb.accept(resp);
477                 }, enqueuedTicks);
478         } else if (request instanceof TransactionPreCommitRequest) {
479             ensureFlushedBuider(optTicks);
480             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
481                 localActor());
482             enqueueRequest(tmp, resp -> {
483                 recordSuccessfulRequest(tmp);
484                 cb.accept(resp);
485             }, enqueuedTicks);
486         } else if (request instanceof TransactionDoCommitRequest) {
487             ensureFlushedBuider(optTicks);
488             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
489                 enqueuedTicks);
490         } else if (request instanceof TransactionAbortRequest) {
491             ensureFlushedBuider(optTicks);
492             enqueueAbort(callback, enqueuedTicks);
493         } else if (request instanceof TransactionPurgeRequest) {
494             enqueuePurge(enqueuedTicks);
495         } else {
496             throw new IllegalArgumentException("Unhandled request {}" + request);
497         }
498     }
499 }