46af74a5630999e0c75f430955875754c274dcb5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Throwables;
15 import com.google.common.base.Verify;
16 import com.google.common.collect.Iterables;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayDeque;
21 import java.util.Deque;
22 import java.util.Iterator;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26 import java.util.function.Consumer;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import javax.annotation.concurrent.NotThreadSafe;
31 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
32 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
35 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
36 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
37 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
41 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
44 import org.opendaylight.controller.cluster.access.concepts.Response;
45 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
46 import org.opendaylight.mdsal.common.api.ReadFailedException;
47 import org.opendaylight.yangtools.concepts.Identifiable;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Class translating transaction operations towards a particular backend shard.
55  *
56  * <p>
57  * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
58  * transitions coming from interactions with backend are expected to be thread-safe.
59  *
60  * <p>
61  * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
62  * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
63  *
64  * @author Robert Varga
65  */
66 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
67     /**
68      * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
69      * and allows compressing multiple requests into a single entry.
70      */
71     @NotThreadSafe
72     private static final class IncrementSequence {
73         private long delta = 1;
74
75         long getDelta() {
76             return delta;
77         }
78
79         void incrementDelta() {
80             delta++;
81         }
82     }
83
84     // Generic state base class. Direct instances are used for fast paths, sub-class is used for successor transitions
85     private static class State {
86         private final String string;
87
88         State(final String string) {
89             this.string = Preconditions.checkNotNull(string);
90         }
91
92         @Override
93         public final String toString() {
94             return string;
95         }
96     }
97
98     // State class used when a successor has interfered. Contains coordinator latch, the successor and previous state
99     private static final class SuccessorState extends State {
100         private final CountDownLatch latch = new CountDownLatch(1);
101         private AbstractProxyTransaction successor;
102         private State prevState;
103
104         SuccessorState() {
105             super("successor");
106         }
107
108         // Synchronize with succession process and return the successor
109         AbstractProxyTransaction await() {
110             try {
111                 latch.await();
112             } catch (InterruptedException e) {
113                 LOG.warn("Interrupted while waiting for latch of {}", successor);
114                 throw Throwables.propagate(e);
115             }
116             return successor;
117         }
118
119         void finish() {
120             latch.countDown();
121         }
122
123         State getPrevState() {
124             return prevState;
125         }
126
127         void setPrevState(final State prevState) {
128             Verify.verify(this.prevState == null);
129             this.prevState = Preconditions.checkNotNull(prevState);
130         }
131
132         // To be called from safe contexts, where successor is known to be completed
133         AbstractProxyTransaction getSuccessor() {
134             return Verify.verifyNotNull(successor);
135         }
136
137         void setSuccessor(final AbstractProxyTransaction successor) {
138             Verify.verify(this.successor == null);
139             this.successor = Preconditions.checkNotNull(successor);
140         }
141     }
142
143     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
144     private static final AtomicIntegerFieldUpdater<AbstractProxyTransaction> SEALED_UPDATER =
145             AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed");
146     private static final AtomicReferenceFieldUpdater<AbstractProxyTransaction, State> STATE_UPDATER =
147             AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state");
148     private static final State OPEN = new State("open");
149     private static final State SEALED = new State("sealed");
150     private static final State FLUSHED = new State("flushed");
151
152     // Touched from client actor thread only
153     private final Deque<Object> successfulRequests = new ArrayDeque<>();
154     private final ProxyHistory parent;
155
156     // Accessed from user thread only, which may not access this object concurrently
157     private long sequence;
158
159     /*
160      * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
161      * the backend -- which may include a successor.
162      *
163      * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
164      * the successor placement needs to be atomic with regard to the application thread.
165      *
166      * In the common case, the application thread performs performs the seal operations and then "immediately" sends
167      * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
168      * or timeout, when a successor is injected.
169      *
170      * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
171      * after a successor was injected, so that it can be properly sealed if we are racing. Further complication comes
172      * from lock ordering, where the successor injection works with a locked queue and locks proxy objects -- leading
173      * to a potential AB-BA deadlock in case of a naive implementation.
174      *
175      * For tracking user-visible state we use a single volatile int, which is flipped atomically from 0 to 1 exactly
176      * once in {@link AbstractProxyTransaction#seal()}. That keeps common operations fast, as they need to perform
177      * only a single volatile read to assert state correctness.
178      *
179      * For synchronizing client actor (successor-injecting) and user (commit-driving) thread, we keep a separate state
180      * variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
181      * for slow paths (when successor is injected/present).
182      */
183     private volatile int sealed = 0;
184     private volatile State state = OPEN;
185
186     AbstractProxyTransaction(final ProxyHistory parent) {
187         this.parent = Preconditions.checkNotNull(parent);
188     }
189
190     final ActorRef localActor() {
191         return parent.localActor();
192     }
193
194     private void incrementSequence(final long delta) {
195         sequence += delta;
196         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
197     }
198
199     final long nextSequence() {
200         final long ret = sequence++;
201         LOG.debug("Transaction {} allocated sequence {}", this, ret);
202         return ret;
203     }
204
205     final void delete(final YangInstanceIdentifier path) {
206         checkReadWrite();
207         checkNotSealed();
208         doDelete(path);
209     }
210
211     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
212         checkReadWrite();
213         checkNotSealed();
214         doMerge(path, data);
215     }
216
217     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
218         checkReadWrite();
219         checkNotSealed();
220         doWrite(path, data);
221     }
222
223     final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
224         checkNotSealed();
225         return doExists(path);
226     }
227
228     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
229         checkNotSealed();
230         return doRead(path);
231     }
232
233     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
234             final long enqueuedTicks) {
235         LOG.debug("Transaction proxy {} enqueing request {} callback {}", this, request, callback);
236         parent.enqueueRequest(request, callback, enqueuedTicks);
237     }
238
239     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
240         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
241         parent.sendRequest(request, callback);
242     }
243
244     /**
245      * Seal this transaction before it is either committed or aborted.
246      */
247     final void seal() {
248         // Transition user-visible state first
249         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
250         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
251         internalSeal();
252     }
253
254     final void ensureSealed() {
255         if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
256             internalSeal();
257         }
258     }
259
260     private void internalSeal() {
261         doSeal();
262         parent.onTransactionSealed(this);
263
264         // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit().
265         if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) {
266             // Slow path: wait for the successor to complete
267             final AbstractProxyTransaction successor = awaitSuccessor();
268
269             // At this point the successor has completed transition and is possibly visible by the user thread, which is
270             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
271             // Propagate state and seal the successor.
272             flushState(successor);
273             successor.ensureSealed();
274         }
275     }
276
277     private void checkNotSealed() {
278         Preconditions.checkState(sealed == 0, "Transaction %s has already been sealed", getIdentifier());
279     }
280
281     private void checkSealed() {
282         Preconditions.checkState(sealed != 0, "Transaction %s has not been sealed yet", getIdentifier());
283     }
284
285     private SuccessorState getSuccessorState() {
286         final State local = state;
287         Verify.verify(local instanceof SuccessorState, "State %s has unexpected class", local);
288         return (SuccessorState) local;
289     }
290
291     private void checkReadWrite() {
292         if (isSnapshotOnly()) {
293             throw new UnsupportedOperationException("Transaction " + getIdentifier() + " is a read-only snapshot");
294         }
295     }
296
297     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
298         successfulRequests.add(Verify.verifyNotNull(req));
299     }
300
301     final void recordFinishedRequest() {
302         final Object last = successfulRequests.peekLast();
303         if (last instanceof IncrementSequence) {
304             ((IncrementSequence) last).incrementDelta();
305         } else {
306             successfulRequests.addLast(new IncrementSequence());
307         }
308     }
309
310     /**
311      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
312      * being sent to the backend.
313      */
314     final void abort() {
315         checkNotSealed();
316         doAbort();
317         parent.abortTransaction(this);
318     }
319
320     final void abort(final VotingFuture<Void> ret) {
321         checkSealed();
322
323         sendAbort(t -> {
324             if (t instanceof TransactionAbortSuccess) {
325                 ret.voteYes();
326             } else if (t instanceof RequestFailure) {
327                 ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
328             } else {
329                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
330             }
331
332             // This is a terminal request, hence we do not need to record it
333             LOG.debug("Transaction {} abort completed", this);
334             sendPurge();
335         });
336     }
337
338     final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
339         enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
340             enqueuedTicks);
341     }
342
343     final void sendAbort(final Consumer<Response<?, ?>> callback) {
344         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
345     }
346
347     /**
348      * Commit this transaction, possibly in a coordinated fashion.
349      *
350      * @param coordinated True if this transaction should be coordinated across multiple participants.
351      * @return Future completion
352      */
353     final ListenableFuture<Boolean> directCommit() {
354         checkReadWrite();
355         checkSealed();
356
357         // Precludes startReconnect() from interfering with the fast path
358         synchronized (this) {
359             if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) {
360                 final SettableFuture<Boolean> ret = SettableFuture.create();
361                 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
362                     if (t instanceof TransactionCommitSuccess) {
363                         ret.set(Boolean.TRUE);
364                     } else if (t instanceof RequestFailure) {
365                         ret.setException(((RequestFailure<?, ?>) t).getCause().unwrap());
366                     } else {
367                         ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
368                     }
369
370                     // This is a terminal request, hence we do not need to record it
371                     LOG.debug("Transaction {} directCommit completed", this);
372                     sendPurge();
373                 });
374
375                 return ret;
376             }
377         }
378
379         // We have had some interference with successor injection, wait for it to complete and defer to the successor.
380         return awaitSuccessor().directCommit();
381     }
382
383     final void canCommit(final VotingFuture<?> ret) {
384         checkReadWrite();
385         checkSealed();
386
387         // Precludes startReconnect() from interfering with the fast path
388         synchronized (this) {
389             if (STATE_UPDATER.compareAndSet(this, SEALED, FLUSHED)) {
390                 final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
391
392                 sendRequest(req, t -> {
393                     if (t instanceof TransactionCanCommitSuccess) {
394                         ret.voteYes();
395                     } else if (t instanceof RequestFailure) {
396                         ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
397                     } else {
398                         ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
399                     }
400
401                     recordSuccessfulRequest(req);
402                     LOG.debug("Transaction {} canCommit completed", this);
403                 });
404
405                 return;
406             }
407         }
408
409         // We have had some interference with successor injection, wait for it to complete and defer to the successor.
410         awaitSuccessor().canCommit(ret);
411     }
412
413     private AbstractProxyTransaction awaitSuccessor() {
414         return getSuccessorState().await();
415     }
416
417     final void preCommit(final VotingFuture<?> ret) {
418         checkReadWrite();
419         checkSealed();
420
421         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
422             localActor());
423         sendRequest(req, t -> {
424             if (t instanceof TransactionPreCommitSuccess) {
425                 ret.voteYes();
426             } else if (t instanceof RequestFailure) {
427                 ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
428             } else {
429                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
430             }
431
432             onPreCommitComplete(req);
433         });
434     }
435
436     private void onPreCommitComplete(final TransactionRequest<?> req) {
437         /*
438          * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
439          * to storage after the timeout completes.
440          *
441          * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
442          * the precommit request, so we know which request to use for resync.
443          */
444         LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
445         successfulRequests.clear();
446
447         // TODO: this works, but can contain some useless state (like batched operations). Create an empty
448         //       equivalent of this request and store that.
449         recordSuccessfulRequest(req);
450     }
451
452     final void doCommit(final VotingFuture<?> ret) {
453         checkReadWrite();
454         checkSealed();
455
456         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
457             if (t instanceof TransactionCommitSuccess) {
458                 ret.voteYes();
459             } else if (t instanceof RequestFailure) {
460                 ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
461             } else {
462                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
463             }
464
465             LOG.debug("Transaction {} doCommit completed", this);
466             sendPurge();
467         });
468     }
469
470     final void sendPurge() {
471         successfulRequests.clear();
472
473         final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
474         sendRequest(req, t -> {
475             LOG.debug("Transaction {} purge completed", this);
476             parent.completeTransaction(this);
477         });
478     }
479
480     final void enqueuePurge(final long enqueuedTicks) {
481         successfulRequests.clear();
482
483         final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
484         enqueueRequest(req, t -> {
485             LOG.debug("Transaction {} purge completed", this);
486             parent.completeTransaction(this);
487         }, enqueuedTicks);
488     }
489
490     // Called with the connection unlocked
491     final synchronized void startReconnect() {
492         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
493         // state. This method is called with the queue still unlocked.
494         final SuccessorState nextState = new SuccessorState();
495         final State prevState = STATE_UPDATER.getAndSet(this, nextState);
496
497         LOG.debug("Start reconnect of proxy {} previous state {}", this, prevState);
498         Verify.verify(!(prevState instanceof SuccessorState), "Proxy %s duplicate reconnect attempt after %s", this,
499             prevState);
500
501         // We have asserted a slow-path state, seal(), canCommit(), directCommit() are forced to slow paths, which will
502         // wait until we unblock nextState's latch before accessing state. Now we record prevState for later use and we
503         // are done.
504         nextState.setPrevState(prevState);
505     }
506
507     // Called with the connection locked
508     final void replayMessages(final AbstractProxyTransaction successor,
509             final Iterable<ConnectionEntry> enqueuedEntries) {
510         final SuccessorState local = getSuccessorState();
511         local.setSuccessor(successor);
512
513         // Replay successful requests first
514         if (!successfulRequests.isEmpty()) {
515             // We need to find a good timestamp to use for successful requests, as we do not want to time them out
516             // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
517             // time. We will pick the time of the first entry available. If there is none, we will just use current
518             // time, as all other requests will get enqueued afterwards.
519             final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
520             final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
521
522             for (Object obj : successfulRequests) {
523                 if (obj instanceof TransactionRequest) {
524                     LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
525                     successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
526                 } else {
527                     Verify.verify(obj instanceof IncrementSequence);
528                     successor.incrementSequence(((IncrementSequence) obj).getDelta());
529                 }
530             }
531             LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
532             successfulRequests.clear();
533         }
534
535         // Now replay whatever is in the connection
536         final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
537         while (it.hasNext()) {
538             final ConnectionEntry e = it.next();
539             final Request<?, ?> req = e.getRequest();
540
541             if (getIdentifier().equals(req.getTarget())) {
542                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
543                 LOG.debug("Replaying queued request {} to successor {}", req, successor);
544                 successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
545                 it.remove();
546             }
547         }
548
549         /*
550          * Check the state at which we have started the reconnect attempt. State transitions triggered while we were
551          * reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
552          * at the end of this method.
553          */
554         final State prevState = local.getPrevState();
555         if (SEALED.equals(prevState)) {
556             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
557             flushState(successor);
558             successor.ensureSealed();
559         }
560     }
561
562     /**
563      * Invoked from {@link #replayMessages(AbstractProxyTransaction, Iterable)} to have successor adopt an in-flight
564      * request.
565      *
566      * <p>
567      * Note: this method is invoked by the predecessor on the successor.
568      *
569      * @param request Request which needs to be forwarded
570      * @param callback Callback to be invoked once the request completes
571      * @param enqueuedTicks ticker-based time stamp when the request was enqueued
572      */
573     private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
574             final long enqueuedTicks) {
575         if (request instanceof AbstractLocalTransactionRequest) {
576             handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
577         } else {
578             handleReplayedRemoteRequest(request, callback, enqueuedTicks);
579         }
580     }
581
582     // Called with the connection locked
583     final void finishReconnect() {
584         final SuccessorState local = getSuccessorState();
585         LOG.debug("Finishing reconnect of proxy {}", this);
586
587         // All done, release the latch, unblocking seal() and canCommit() slow paths
588         local.finish();
589     }
590
591     /**
592      * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
593      * and forwarded to the successor connection.
594      *
595      * @param request Request to be forwarded
596      * @param callback Original callback
597      */
598     final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
599         forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback);
600     }
601
602     final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest<?> request,
603             final Consumer<Response<?, ?>> callback) {
604         if (successor instanceof LocalProxyTransaction) {
605             forwardToLocal((LocalProxyTransaction)successor, request, callback);
606         } else if (successor instanceof RemoteProxyTransaction) {
607             forwardToRemote((RemoteProxyTransaction)successor, request, callback);
608         } else {
609             throw new IllegalStateException("Unhandled successor " + successor);
610         }
611     }
612
613     abstract boolean isSnapshotOnly();
614
615     abstract void doDelete(YangInstanceIdentifier path);
616
617     abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
618
619     abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
620
621     abstract CheckedFuture<Boolean, ReadFailedException> doExists(YangInstanceIdentifier path);
622
623     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
624
625     abstract void doSeal();
626
627     abstract void doAbort();
628
629     @GuardedBy("this")
630     abstract void flushState(AbstractProxyTransaction successor);
631
632     abstract TransactionRequest<?> commitRequest(boolean coordinated);
633
634     /**
635      * Replay a request originating in this proxy to a successor remote proxy.
636      */
637     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
638             Consumer<Response<?, ?>> callback);
639
640     /**
641      * Replay a request originating in this proxy to a successor local proxy.
642      */
643     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
644             Consumer<Response<?, ?>> callback);
645
646     /**
647      * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor.
648      *
649      * <p>
650      * Note: this method is invoked by the predecessor on the successor.
651      *
652      * @param request Request which needs to be forwarded
653      * @param callback Callback to be invoked once the request completes
654      * @param enqueuedTicks Time stamp to use for enqueue time
655      */
656     abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest<?> request,
657             @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
658
659     /**
660      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
661      *
662      * <p>
663      * Note: this method is invoked by the predecessor on the successor.
664      *
665      * @param request Request which needs to be forwarded
666      * @param callback Callback to be invoked once the request completes
667      * @param enqueuedTicks Time stamp to use for enqueue time
668      */
669     abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
670             @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
671
672     @Override
673     public final String toString() {
674         return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
675     }
676 }