7f5bec1ff6c25dd7dd0650217e327e96a82c9e5c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayDeque;
19 import java.util.Deque;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.function.Consumer;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import javax.annotation.concurrent.NotThreadSafe;
26 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.RequestException;
35 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.mdsal.common.api.ReadFailedException;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Class translating transaction operations towards a particular backend shard.
47  *
48  * <p>
49  * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
50  * transitions coming from interactions with backend are expected to be thread-safe.
51  *
52  * <p>
53  * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
54  * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
55  *
56  * @author Robert Varga
57  */
58 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
59     /**
60      * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
61      * and allows compressing multiple requests into a single entry.
62      */
63     @NotThreadSafe
64     private static final class IncrementSequence {
65         private long delta = 1;
66
67         long getDelta() {
68             return delta;
69         }
70
71         void incrementDelta() {
72             delta++;
73         }
74     }
75
76     private enum SealState {
77         /**
78          * The user has not sealed the transaction yet.
79          */
80         OPEN,
81         /**
82          * The user has sealed the transaction, but has not issued a canCommit().
83          */
84         SEALED,
85         /**
86          * The user has sealed the transaction and has issued a canCommit().
87          */
88         FLUSHED,
89     }
90
91     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
92
93     private final Deque<Object> successfulRequests = new ArrayDeque<>();
94     private final ProxyHistory parent;
95
96     /*
97      * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
98      * the backend -- which may include a successor.
99      *
100      * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
101      * the successor placement needs to be atomic with regard to the application thread.
102      *
103      * In the common case, the application thread performs performs the seal operations and then "immediately" sends
104      * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
105      * or timeout, when a successor is injected.
106      *
107      * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
108      * after a successor was injected, so that it can be properly sealed if we are racing.
109      */
110     private volatile SealState sealed = SealState.OPEN;
111     @GuardedBy("this")
112     private AbstractProxyTransaction successor;
113     @GuardedBy("this")
114     private CountDownLatch successorLatch;
115
116     // Accessed from user thread only, which may not access this object concurrently
117     private long sequence;
118
119
120     AbstractProxyTransaction(final ProxyHistory parent) {
121         this.parent = Preconditions.checkNotNull(parent);
122     }
123
124     final ActorRef localActor() {
125         return parent.localActor();
126     }
127
128     private void incrementSequence(final long delta) {
129         sequence += delta;
130         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
131     }
132
133     final long nextSequence() {
134         final long ret = sequence++;
135         LOG.debug("Transaction {} allocated sequence {}", this, ret);
136         return ret;
137     }
138
139     final void delete(final YangInstanceIdentifier path) {
140         checkNotSealed();
141         doDelete(path);
142     }
143
144     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
145         checkNotSealed();
146         doMerge(path, data);
147     }
148
149     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
150         checkNotSealed();
151         doWrite(path, data);
152     }
153
154     final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
155         checkNotSealed();
156         return doExists(path);
157     }
158
159     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
160         checkNotSealed();
161         return doRead(path);
162     }
163
164     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
165         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
166         parent.sendRequest(request, callback);
167     }
168
169     /**
170      * Seal this transaction before it is either committed or aborted.
171      */
172     final void seal() {
173         final CountDownLatch localLatch;
174
175         synchronized (this) {
176             checkNotSealed();
177             doSeal();
178
179             // Fast path: no successor
180             if (successor == null) {
181                 sealed = SealState.SEALED;
182                 parent.onTransactionSealed(this);
183                 return;
184             }
185
186             localLatch = successorLatch;
187         }
188
189         // Slow path: wait for the latch
190         LOG.debug("{} waiting on successor latch", getIdentifier());
191         try {
192             localLatch.await();
193         } catch (InterruptedException e) {
194             LOG.warn("{} interrupted while waiting for latch", getIdentifier());
195             throw Throwables.propagate(e);
196         }
197
198         synchronized (this) {
199             LOG.debug("{} reacquired lock", getIdentifier());
200
201             flushState(successor);
202             successor.seal();
203
204             sealed = SealState.FLUSHED;
205             parent.onTransactionSealed(this);
206         }
207     }
208
209     private void checkNotSealed() {
210         Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier());
211     }
212
213     private SealState checkSealed() {
214         final SealState local = sealed;
215         Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier());
216         return local;
217     }
218
219     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
220         successfulRequests.add(Verify.verifyNotNull(req));
221     }
222
223     final void recordFinishedRequest() {
224         final Object last = successfulRequests.peekLast();
225         if (last instanceof IncrementSequence) {
226             ((IncrementSequence) last).incrementDelta();
227         } else {
228             successfulRequests.addLast(new IncrementSequence());
229         }
230     }
231
232     /**
233      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
234      * being sent to the backend.
235      */
236     final void abort() {
237         checkNotSealed();
238         doAbort();
239         parent.abortTransaction(this);
240     }
241
242     final void abort(final VotingFuture<Void> ret) {
243         checkSealed();
244
245         sendAbort(t -> {
246             if (t instanceof TransactionAbortSuccess) {
247                 ret.voteYes();
248             } else if (t instanceof RequestFailure) {
249                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
250             } else {
251                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
252             }
253
254             // This is a terminal request, hence we do not need to record it
255             LOG.debug("Transaction {} abort completed", this);
256             parent.completeTransaction(this);
257         });
258     }
259
260     final void sendAbort(final Consumer<Response<?, ?>> callback) {
261         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
262     }
263
264     /**
265      * Commit this transaction, possibly in a coordinated fashion.
266      *
267      * @param coordinated True if this transaction should be coordinated across multiple participants.
268      * @return Future completion
269      */
270     final ListenableFuture<Boolean> directCommit() {
271         final CountDownLatch localLatch;
272
273         synchronized (this) {
274             final SealState local = checkSealed();
275
276             // Fast path: no successor asserted
277             if (successor == null) {
278                 Verify.verify(local == SealState.SEALED);
279
280                 final SettableFuture<Boolean> ret = SettableFuture.create();
281                 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
282                     if (t instanceof TransactionCommitSuccess) {
283                         ret.set(Boolean.TRUE);
284                     } else if (t instanceof RequestFailure) {
285                         ret.setException(((RequestFailure<?, ?>) t).getCause());
286                     } else {
287                         ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
288                     }
289
290                     // This is a terminal request, hence we do not need to record it
291                     LOG.debug("Transaction {} directCommit completed", this);
292                     parent.completeTransaction(this);
293                 });
294
295                 sealed = SealState.FLUSHED;
296                 return ret;
297             }
298
299             // We have a successor, take its latch
300             localLatch = successorLatch;
301         }
302
303         // Slow path: we need to wait for the successor to completely propagate
304         LOG.debug("{} waiting on successor latch", getIdentifier());
305         try {
306             localLatch.await();
307         } catch (InterruptedException e) {
308             LOG.warn("{} interrupted while waiting for latch", getIdentifier());
309             throw Throwables.propagate(e);
310         }
311
312         synchronized (this) {
313             LOG.debug("{} reacquired lock", getIdentifier());
314
315             final SealState local = sealed;
316             Verify.verify(local == SealState.FLUSHED);
317
318             return successor.directCommit();
319         }
320     }
321
322     final void canCommit(final VotingFuture<?> ret) {
323         final CountDownLatch localLatch;
324
325         synchronized (this) {
326             final SealState local = checkSealed();
327
328             // Fast path: no successor asserted
329             if (successor == null) {
330                 Verify.verify(local == SealState.SEALED);
331
332                 final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
333                 sendRequest(req, t -> {
334                     if (t instanceof TransactionCanCommitSuccess) {
335                         ret.voteYes();
336                     } else if (t instanceof RequestFailure) {
337                         ret.voteNo(((RequestFailure<?, ?>) t).getCause());
338                     } else {
339                         ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
340                     }
341
342                     recordSuccessfulRequest(req);
343                     LOG.debug("Transaction {} canCommit completed", this);
344                 });
345
346                 sealed = SealState.FLUSHED;
347                 return;
348             }
349
350             // We have a successor, take its latch
351             localLatch = successorLatch;
352         }
353
354         // Slow path: we need to wait for the successor to completely propagate
355         LOG.debug("{} waiting on successor latch", getIdentifier());
356         try {
357             localLatch.await();
358         } catch (InterruptedException e) {
359             LOG.warn("{} interrupted while waiting for latch", getIdentifier());
360             throw Throwables.propagate(e);
361         }
362
363         synchronized (this) {
364             LOG.debug("{} reacquired lock", getIdentifier());
365
366             final SealState local = sealed;
367             Verify.verify(local == SealState.FLUSHED);
368
369             successor.canCommit(ret);
370         }
371     }
372
373     final void preCommit(final VotingFuture<?> ret) {
374         checkSealed();
375
376         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
377             localActor());
378         sendRequest(req, t -> {
379             if (t instanceof TransactionPreCommitSuccess) {
380                 ret.voteYes();
381             } else if (t instanceof RequestFailure) {
382                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
383             } else {
384                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
385             }
386
387             recordSuccessfulRequest(req);
388             LOG.debug("Transaction {} preCommit completed", this);
389         });
390     }
391
392     void doCommit(final VotingFuture<?> ret) {
393         checkSealed();
394
395         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
396             if (t instanceof TransactionCommitSuccess) {
397                 ret.voteYes();
398             } else if (t instanceof RequestFailure) {
399                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
400             } else {
401                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
402             }
403
404             LOG.debug("Transaction {} doCommit completed", this);
405             parent.completeTransaction(this);
406         });
407     }
408
409     final synchronized void startReconnect(final AbstractProxyTransaction successor) {
410         Preconditions.checkState(this.successor == null);
411         this.successor = Preconditions.checkNotNull(successor);
412
413         for (Object obj : successfulRequests) {
414             if (obj instanceof TransactionRequest) {
415                 LOG.debug("Forwarding request {} to successor {}", obj, successor);
416                 successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
417             } else {
418                 Verify.verify(obj instanceof IncrementSequence);
419                 successor.incrementSequence(((IncrementSequence) obj).getDelta());
420             }
421         }
422         LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
423         successfulRequests.clear();
424
425         /*
426          * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed
427          * finishConnect().
428          */
429         successorLatch = new CountDownLatch(1);
430     }
431
432     final synchronized void finishReconnect() {
433         Preconditions.checkState(successorLatch != null);
434
435         if (sealed == SealState.SEALED) {
436             /*
437              * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current
438              * leftover state to the successor now.
439              */
440             flushState(successor);
441             successor.seal();
442             sealed = SealState.FLUSHED;
443         }
444
445         // All done, release the latch, unblocking seal() and canCommit()
446         successorLatch.countDown();
447     }
448
449     /**
450      * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
451      * and forwarded to the successor connection.
452      *
453      * @param request Request to be forwarded
454      * @param callback Original callback
455      * @throws RequestException when the request is unhandled by the successor
456      */
457     final synchronized void replayRequest(final TransactionRequest<?> request,
458             final Consumer<Response<?, ?>> callback) {
459         Preconditions.checkState(successor != null, "%s does not have a successor set", this);
460
461         if (successor instanceof LocalProxyTransaction) {
462             forwardToLocal((LocalProxyTransaction)successor, request, callback);
463         } else if (successor instanceof RemoteProxyTransaction) {
464             forwardToRemote((RemoteProxyTransaction)successor, request, callback);
465         } else {
466             throw new IllegalStateException("Unhandled successor " + successor);
467         }
468     }
469
470     abstract void doDelete(final YangInstanceIdentifier path);
471
472     abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
473
474     abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
475
476     abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
477
478     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
479             final YangInstanceIdentifier path);
480
481     abstract void doSeal();
482
483     abstract void doAbort();
484
485     @GuardedBy("this")
486     abstract void flushState(AbstractProxyTransaction successor);
487
488     abstract TransactionRequest<?> commitRequest(boolean coordinated);
489
490     /**
491      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
492      * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
493      * operations are packaged in the message.
494      *
495      * <p>
496      * Note: this method is invoked by the predecessor on the successor.
497      *
498      * @param request Request which needs to be forwarded
499      * @param callback Callback to be invoked once the request completes
500      */
501     abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
502             @Nullable Consumer<Response<?, ?>> callback);
503
504     /**
505      * Replay a request originating in this proxy to a successor remote proxy.
506      */
507     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
508             Consumer<Response<?, ?>> callback);
509
510     /**
511      * Replay a request originating in this proxy to a successor local proxy.
512      */
513     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
514             Consumer<Response<?, ?>> callback);
515 }