Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / RemoteProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Verify.verify;
11
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.Optional;
16 import java.util.OptionalLong;
17 import java.util.function.Consumer;
18 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
25 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
26 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
28 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
29 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
30 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
41 import org.opendaylight.controller.cluster.access.concepts.RequestException;
42 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
43 import org.opendaylight.controller.cluster.access.concepts.Response;
44 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
45 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
46 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
47 import org.opendaylight.mdsal.common.api.ReadFailedException;
48 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
58  * not known or is known to be not co-located with the client.
59  *
60  * <p>
61  * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
62  * maintaining any submitted operations until the leader is discovered.
63  *
64  * <p>
65  * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
66  * transitions based on backend responses are thread-safe.
67  */
68 final class RemoteProxyTransaction extends AbstractProxyTransaction {
69     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
70
71     private final ModifyTransactionRequestBuilder builder;
72     private final boolean sendReadyOnSeal;
73     private final boolean snapshotOnly;
74     private final int maxModifications;
75
76     private boolean builderBusy;
77
78     private volatile Exception operationFailure;
79
80     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
81             final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
82         super(parent, isDone);
83         this.snapshotOnly = snapshotOnly;
84         this.sendReadyOnSeal = sendReadyOnSeal;
85         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
86         maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount();
87     }
88
89     @Override
90     boolean isSnapshotOnly() {
91         return snapshotOnly;
92     }
93
94     @Override
95     public TransactionIdentifier getIdentifier() {
96         return builder.getIdentifier();
97     }
98
99     @Override
100     void doDelete(final YangInstanceIdentifier path) {
101         appendModification(new TransactionDelete(path), OptionalLong.empty());
102     }
103
104     @Override
105     void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) {
106         appendModification(new TransactionMerge(path, data), OptionalLong.empty());
107     }
108
109     @Override
110     void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) {
111         appendModification(new TransactionWrite(path, data), OptionalLong.empty());
112     }
113
114     private <T> FluentFuture<T> sendReadRequest(final AbstractReadTransactionRequest<?> request,
115             final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
116         // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
117         final Exception local = operationFailure;
118         if (local != null) {
119             return FluentFutures.immediateFailedFluentFuture(
120                     new ReadFailedException("Previous operation failed", local));
121         }
122
123         // Make sure we send any modifications before issuing a read
124         ensureFlushedBuider();
125         sendRequest(request, completer);
126         return FluentFuture.from(future);
127     }
128
129     @Override
130     FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
131         final SettableFuture<Boolean> future = SettableFuture.create();
132         return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
133             isSnapshotOnly()), t -> completeExists(path, future, t), future);
134     }
135
136     @Override
137     FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
138         final SettableFuture<Optional<NormalizedNode>> future = SettableFuture.create();
139         return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path,
140             isSnapshotOnly()), t -> completeRead(path, future, t), future);
141     }
142
143     private void ensureInitializedBuilder() {
144         if (!builderBusy) {
145             builder.setSequence(nextSequence());
146             builderBusy = true;
147         }
148     }
149
150     private void ensureFlushedBuider() {
151         ensureFlushedBuider(OptionalLong.empty());
152     }
153
154     private void ensureFlushedBuider(final OptionalLong enqueuedTicks) {
155         if (builderBusy) {
156             flushBuilder(enqueuedTicks);
157         }
158     }
159
160     private void flushBuilder(final OptionalLong enqueuedTicks) {
161         final ModifyTransactionRequest request = builder.build();
162         builderBusy = false;
163
164         sendModification(request, enqueuedTicks);
165     }
166
167     private void sendModification(final TransactionRequest<?> request, final OptionalLong enqueuedTicks) {
168         if (enqueuedTicks.isPresent()) {
169             enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.orElseThrow());
170         } else {
171             sendRequest(request, response -> completeModify(request, response));
172         }
173     }
174
175     private void appendModification(final TransactionModification modification) {
176         appendModification(modification, OptionalLong.empty());
177     }
178
179     private void appendModification(final TransactionModification modification, final OptionalLong enqueuedTicks) {
180         if (operationFailure == null) {
181             ensureInitializedBuilder();
182
183             builder.addModification(modification);
184             if (builder.size() >= maxModifications) {
185                 flushBuilder(enqueuedTicks);
186             }
187         } else {
188             LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
189         }
190     }
191
192     private void completeModify(final TransactionRequest<?> request, final Response<?, ?> response) {
193         LOG.debug("Modification request {} completed with {}", request, response);
194
195         if (response instanceof TransactionSuccess) {
196             // Happy path
197             recordSuccessfulRequest(request);
198         } else {
199             recordFailedResponse(response);
200         }
201     }
202
203     private Exception recordFailedResponse(final Response<?, ?> response) {
204         final Exception failure;
205         if (response instanceof RequestFailure<?, ?> requestFailure) {
206             final RequestException cause = requestFailure.getCause();
207             failure = cause instanceof RequestTimeoutException
208                     ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause;
209         } else {
210             LOG.warn("Unhandled response {}", response);
211             failure = new IllegalArgumentException("Unhandled response " + response.getClass());
212         }
213
214         if (operationFailure == null) {
215             LOG.debug("Transaction {} failed", getIdentifier(), failure);
216             operationFailure = failure;
217         }
218         return failure;
219     }
220
221     private void failReadFuture(final SettableFuture<?> future, final String message,
222             final Response<?, ?> response) {
223         future.setException(new ReadFailedException(message, recordFailedResponse(response)));
224     }
225
226     private void completeExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> future,
227             final Response<?, ?> response) {
228         LOG.debug("Exists request for {} completed with {}", path, response);
229
230         if (response instanceof ExistsTransactionSuccess success) {
231             future.set(success.getExists());
232         } else {
233             failReadFuture(future, "Error executing exists request for path " + path, response);
234         }
235
236         recordFinishedRequest(response);
237     }
238
239     private void completeRead(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode>> future,
240             final Response<?, ?> response) {
241         LOG.debug("Read request for {} completed with {}", path, response);
242
243         if (response instanceof ReadTransactionSuccess success) {
244             future.set(success.getData());
245         } else {
246             failReadFuture(future, "Error reading data for path " + path, response);
247         }
248
249         recordFinishedRequest(response);
250     }
251
252     @Override
253     ModifyTransactionRequest abortRequest() {
254         ensureInitializedBuilder();
255         builder.setAbort();
256         builderBusy = false;
257         return builder.build();
258     }
259
260     @Override
261     ModifyTransactionRequest commitRequest(final boolean coordinated) {
262         ensureInitializedBuilder();
263         builder.setCommit(coordinated);
264         builderBusy = false;
265         return builder.build();
266     }
267
268     private ModifyTransactionRequest readyRequest() {
269         ensureInitializedBuilder();
270         builder.setReady();
271         builderBusy = false;
272         return builder.build();
273     }
274
275     @Override
276     boolean sealAndSend(final OptionalLong enqueuedTicks) {
277         if (sendReadyOnSeal) {
278             ensureInitializedBuilder();
279             builder.setReady();
280             flushBuilder(enqueuedTicks);
281         }
282         return super.sealAndSend(enqueuedTicks);
283     }
284
285     @Override
286     Optional<ModifyTransactionRequest> flushState() {
287         if (!builderBusy) {
288             return Optional.empty();
289         }
290
291         final ModifyTransactionRequest request = builder.build();
292         builderBusy = false;
293         return Optional.of(request);
294     }
295
296     @Override
297     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
298             final Consumer<Response<?, ?>> callback) {
299         successor.handleForwardedRequest(request, callback);
300     }
301
302     void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
303         if (request instanceof ModifyTransactionRequest modifyRequest) {
304             handleForwardedModifyTransactionRequest(callback, modifyRequest);
305         } else if (request instanceof ReadTransactionRequest readRequest) {
306             ensureFlushedBuider();
307             sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
308                 readRequest.getPath(), isSnapshotOnly()), resp -> {
309                     recordFinishedRequest(resp);
310                     callback.accept(resp);
311                 });
312         } else if (request instanceof ExistsTransactionRequest existsRequest) {
313             ensureFlushedBuider();
314             sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
315                 existsRequest.getPath(), isSnapshotOnly()), resp -> {
316                     recordFinishedRequest(resp);
317                     callback.accept(resp);
318                 });
319         } else if (request instanceof TransactionPreCommitRequest) {
320             ensureFlushedBuider();
321             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
322                 localActor());
323             sendRequest(tmp, resp -> {
324                 recordSuccessfulRequest(tmp);
325                 callback.accept(resp);
326             });
327         } else if (request instanceof TransactionDoCommitRequest) {
328             ensureFlushedBuider();
329             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
330         } else if (request instanceof TransactionAbortRequest) {
331             ensureFlushedBuider();
332             sendDoAbort(callback);
333         } else if (request instanceof TransactionPurgeRequest) {
334             enqueuePurge(callback);
335         } else {
336             throw unhandledRequest(request);
337         }
338     }
339
340     private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
341             final ModifyTransactionRequest req) {
342         req.getModifications().forEach(this::appendModification);
343
344         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
345         if (maybeProto.isPresent()) {
346             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
347             // until we know what we are going to do.
348             if (markSealed()) {
349                 if (!sealOnly()) {
350                     LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this);
351                 }
352             }
353
354             final TransactionRequest<?> tmp;
355             switch (maybeProto.orElseThrow()) {
356                 case ABORT:
357                     tmp = abortRequest();
358                     sendRequest(tmp, resp -> {
359                         completeModify(tmp, resp);
360                         callback.accept(resp);
361                     });
362                     break;
363                 case SIMPLE:
364                     tmp = commitRequest(false);
365                     sendRequest(tmp, resp -> {
366                         completeModify(tmp, resp);
367                         callback.accept(resp);
368                     });
369                     break;
370                 case THREE_PHASE:
371                     tmp = commitRequest(true);
372                     sendRequest(tmp, resp -> {
373                         recordSuccessfulRequest(tmp);
374                         callback.accept(resp);
375                     });
376                     break;
377                 case READY:
378                     tmp = readyRequest();
379                     sendRequest(tmp, resp -> {
380                         recordSuccessfulRequest(tmp);
381                         callback.accept(resp);
382                     });
383                     break;
384                 default:
385                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
386             }
387         }
388     }
389
390     @Override
391     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
392             final Consumer<Response<?, ?>> callback) {
393         successor.handleForwardedRemoteRequest(request, callback);
394     }
395
396     @Override
397     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
398             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
399         if (request instanceof CommitLocalTransactionRequest commitRequest) {
400             replayLocalCommitRequest(commitRequest, callback, enqueuedTicks);
401         } else if (request instanceof AbortLocalTransactionRequest) {
402             enqueueRequest(abortRequest(), callback, enqueuedTicks);
403         } else {
404             throw unhandledRequest(request);
405         }
406     }
407
408     private void replayLocalCommitRequest(final CommitLocalTransactionRequest request,
409             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
410         final DataTreeModification mod = request.getModification();
411         final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
412
413         mod.applyToCursor(new AbstractDataTreeModificationCursor() {
414             @Override
415             public void write(final PathArgument child, final NormalizedNode data) {
416                 appendModification(new TransactionWrite(current().node(child), data), optTicks);
417             }
418
419             @Override
420             public void merge(final PathArgument child, final NormalizedNode data) {
421                 appendModification(new TransactionMerge(current().node(child), data), optTicks);
422             }
423
424             @Override
425             public void delete(final PathArgument child) {
426                 appendModification(new TransactionDelete(current().node(child)), optTicks);
427             }
428         });
429
430         enqueueRequest(commitRequest(request.isCoordinated()), callback, enqueuedTicks);
431     }
432
433     @Override
434     void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
435             final long enqueuedTicks) {
436         final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
437         final OptionalLong optTicks = OptionalLong.of(enqueuedTicks);
438
439         if (request instanceof ModifyTransactionRequest modifyRequest) {
440             handleReplayedModifyTransactionRequest(enqueuedTicks, cb, modifyRequest);
441         } else if (request instanceof ReadTransactionRequest readRequest) {
442             ensureFlushedBuider(optTicks);
443             enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
444                 readRequest.getPath(), isSnapshotOnly()), resp -> {
445                     recordFinishedRequest(resp);
446                     cb.accept(resp);
447                 }, enqueuedTicks);
448         } else if (request instanceof ExistsTransactionRequest existsRequest) {
449             ensureFlushedBuider(optTicks);
450             enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
451                 existsRequest.getPath(), isSnapshotOnly()), resp -> {
452                     recordFinishedRequest(resp);
453                     cb.accept(resp);
454                 }, enqueuedTicks);
455         } else if (request instanceof TransactionPreCommitRequest) {
456             ensureFlushedBuider(optTicks);
457             final TransactionRequest<?> tmp = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
458                 localActor());
459             enqueueRequest(tmp, resp -> {
460                 recordSuccessfulRequest(tmp);
461                 cb.accept(resp);
462             }, enqueuedTicks);
463         } else if (request instanceof TransactionDoCommitRequest) {
464             ensureFlushedBuider(optTicks);
465             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
466                 enqueuedTicks);
467         } else if (request instanceof TransactionAbortRequest) {
468             ensureFlushedBuider(optTicks);
469             enqueueDoAbort(callback, enqueuedTicks);
470         } else if (request instanceof TransactionPurgeRequest) {
471             enqueuePurge(callback, enqueuedTicks);
472         } else if (request instanceof IncrementTransactionSequenceRequest req) {
473             ensureFlushedBuider(optTicks);
474             enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(),
475                 snapshotOnly, req.getIncrement()), callback, enqueuedTicks);
476             incrementSequence(req.getIncrement());
477         } else {
478             throw unhandledRequest(request);
479         }
480     }
481
482     private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
483             final ModifyTransactionRequest req) {
484         req.getModifications().forEach(this::appendModification);
485
486         final Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
487         if (maybeProto.isPresent()) {
488             // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
489             // until we know what we are going to do.
490             if (markSealed()) {
491                 verify(sealOnly(), "Attempted to replay seal on %s", this);
492             }
493
494             final TransactionRequest<?> tmp;
495             switch (maybeProto.orElseThrow()) {
496                 case ABORT:
497                     tmp = abortRequest();
498                     enqueueRequest(tmp, resp -> {
499                         completeModify(tmp, resp);
500                         cb.accept(resp);
501                     }, enqueuedTicks);
502                     break;
503                 case SIMPLE:
504                     tmp = commitRequest(false);
505                     enqueueRequest(tmp, resp -> {
506                         completeModify(tmp, resp);
507                         cb.accept(resp);
508                     }, enqueuedTicks);
509                     break;
510                 case THREE_PHASE:
511                     tmp = commitRequest(true);
512                     enqueueRequest(tmp, resp -> {
513                         recordSuccessfulRequest(tmp);
514                         cb.accept(resp);
515                     }, enqueuedTicks);
516                     break;
517                 case READY:
518                     tmp = readyRequest();
519                     enqueueRequest(tmp, resp -> {
520                         recordSuccessfulRequest(tmp);
521                         cb.accept(resp);
522                     }, enqueuedTicks);
523                     break;
524                 default:
525                     throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow());
526             }
527         }
528     }
529 }