2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayDeque;
19 import java.util.Deque;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.function.Consumer;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import javax.annotation.concurrent.GuardedBy;
25 import javax.annotation.concurrent.NotThreadSafe;
26 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.RequestException;
35 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.mdsal.common.api.ReadFailedException;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Class translating transaction operations towards a particular backend shard.
49 * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
50 * transitions coming from interactions with backend are expected to be thread-safe.
53 * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
54 * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
56 * @author Robert Varga
58 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
60 * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
61 * and allows compressing multiple requests into a single entry.
64 private static final class IncrementSequence {
65 private long delta = 1;
71 void incrementDelta() {
76 private enum SealState {
78 * The user has not sealed the transaction yet.
82 * The user has sealed the transaction, but has not issued a canCommit().
86 * The user has sealed the transaction and has issued a canCommit().
91 private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
93 private final Deque<Object> successfulRequests = new ArrayDeque<>();
94 private final ProxyHistory parent;
97 * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
98 * the backend -- which may include a successor.
100 * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
101 * the successor placement needs to be atomic with regard to the application thread.
103 * In the common case, the application thread performs performs the seal operations and then "immediately" sends
104 * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
105 * or timeout, when a successor is injected.
107 * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
108 * after a successor was injected, so that it can be properly sealed if we are racing.
110 private volatile SealState sealed = SealState.OPEN;
112 private AbstractProxyTransaction successor;
114 private CountDownLatch successorLatch;
116 // Accessed from user thread only, which may not access this object concurrently
117 private long sequence;
120 AbstractProxyTransaction(final ProxyHistory parent) {
121 this.parent = Preconditions.checkNotNull(parent);
124 final ActorRef localActor() {
125 return parent.localActor();
128 private void incrementSequence(final long delta) {
130 LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
133 final long nextSequence() {
134 final long ret = sequence++;
135 LOG.debug("Transaction {} allocated sequence {}", this, ret);
139 final void delete(final YangInstanceIdentifier path) {
144 final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
149 final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
154 final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
156 return doExists(path);
159 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
164 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
165 LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
166 parent.sendRequest(request, callback);
170 * Seal this transaction before it is either committed or aborted.
173 final CountDownLatch localLatch;
175 synchronized (this) {
179 // Fast path: no successor
180 if (successor == null) {
181 sealed = SealState.SEALED;
182 parent.onTransactionSealed(this);
186 localLatch = successorLatch;
189 // Slow path: wait for the latch
190 LOG.debug("{} waiting on successor latch", getIdentifier());
193 } catch (InterruptedException e) {
194 LOG.warn("{} interrupted while waiting for latch", getIdentifier());
195 throw Throwables.propagate(e);
198 synchronized (this) {
199 LOG.debug("{} reacquired lock", getIdentifier());
201 flushState(successor);
204 sealed = SealState.FLUSHED;
205 parent.onTransactionSealed(this);
209 private void checkNotSealed() {
210 Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier());
213 private SealState checkSealed() {
214 final SealState local = sealed;
215 Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier());
219 final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
220 successfulRequests.add(Verify.verifyNotNull(req));
223 final void recordFinishedRequest() {
224 final Object last = successfulRequests.peekLast();
225 if (last instanceof IncrementSequence) {
226 ((IncrementSequence) last).incrementDelta();
228 successfulRequests.addLast(new IncrementSequence());
233 * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
234 * being sent to the backend.
239 parent.abortTransaction(this);
242 final void abort(final VotingFuture<Void> ret) {
246 if (t instanceof TransactionAbortSuccess) {
248 } else if (t instanceof RequestFailure) {
249 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
251 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
254 // This is a terminal request, hence we do not need to record it
255 LOG.debug("Transaction {} abort completed", this);
256 parent.completeTransaction(this);
260 final void sendAbort(final Consumer<Response<?, ?>> callback) {
261 sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
265 * Commit this transaction, possibly in a coordinated fashion.
267 * @param coordinated True if this transaction should be coordinated across multiple participants.
268 * @return Future completion
270 final ListenableFuture<Boolean> directCommit() {
271 final CountDownLatch localLatch;
273 synchronized (this) {
274 final SealState local = checkSealed();
276 // Fast path: no successor asserted
277 if (successor == null) {
278 Verify.verify(local == SealState.SEALED);
280 final SettableFuture<Boolean> ret = SettableFuture.create();
281 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
282 if (t instanceof TransactionCommitSuccess) {
283 ret.set(Boolean.TRUE);
284 } else if (t instanceof RequestFailure) {
285 ret.setException(((RequestFailure<?, ?>) t).getCause());
287 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
290 // This is a terminal request, hence we do not need to record it
291 LOG.debug("Transaction {} directCommit completed", this);
292 parent.completeTransaction(this);
295 sealed = SealState.FLUSHED;
299 // We have a successor, take its latch
300 localLatch = successorLatch;
303 // Slow path: we need to wait for the successor to completely propagate
304 LOG.debug("{} waiting on successor latch", getIdentifier());
307 } catch (InterruptedException e) {
308 LOG.warn("{} interrupted while waiting for latch", getIdentifier());
309 throw Throwables.propagate(e);
312 synchronized (this) {
313 LOG.debug("{} reacquired lock", getIdentifier());
315 final SealState local = sealed;
316 Verify.verify(local == SealState.FLUSHED);
318 return successor.directCommit();
322 final void canCommit(final VotingFuture<?> ret) {
323 final CountDownLatch localLatch;
325 synchronized (this) {
326 final SealState local = checkSealed();
328 // Fast path: no successor asserted
329 if (successor == null) {
330 Verify.verify(local == SealState.SEALED);
332 final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
333 sendRequest(req, t -> {
334 if (t instanceof TransactionCanCommitSuccess) {
336 } else if (t instanceof RequestFailure) {
337 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
339 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
342 recordSuccessfulRequest(req);
343 LOG.debug("Transaction {} canCommit completed", this);
346 sealed = SealState.FLUSHED;
350 // We have a successor, take its latch
351 localLatch = successorLatch;
354 // Slow path: we need to wait for the successor to completely propagate
355 LOG.debug("{} waiting on successor latch", getIdentifier());
358 } catch (InterruptedException e) {
359 LOG.warn("{} interrupted while waiting for latch", getIdentifier());
360 throw Throwables.propagate(e);
363 synchronized (this) {
364 LOG.debug("{} reacquired lock", getIdentifier());
366 final SealState local = sealed;
367 Verify.verify(local == SealState.FLUSHED);
369 successor.canCommit(ret);
373 final void preCommit(final VotingFuture<?> ret) {
376 final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
378 sendRequest(req, t -> {
379 if (t instanceof TransactionPreCommitSuccess) {
381 } else if (t instanceof RequestFailure) {
382 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
384 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
387 recordSuccessfulRequest(req);
388 LOG.debug("Transaction {} preCommit completed", this);
392 void doCommit(final VotingFuture<?> ret) {
395 sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
396 if (t instanceof TransactionCommitSuccess) {
398 } else if (t instanceof RequestFailure) {
399 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
401 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
404 LOG.debug("Transaction {} doCommit completed", this);
405 parent.completeTransaction(this);
409 final synchronized void startReconnect(final AbstractProxyTransaction successor) {
410 Preconditions.checkState(this.successor == null);
411 this.successor = Preconditions.checkNotNull(successor);
413 for (Object obj : successfulRequests) {
414 if (obj instanceof TransactionRequest) {
415 LOG.debug("Forwarding request {} to successor {}", obj, successor);
416 successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
418 Verify.verify(obj instanceof IncrementSequence);
419 successor.incrementSequence(((IncrementSequence) obj).getDelta());
422 LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
423 successfulRequests.clear();
426 * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed
429 successorLatch = new CountDownLatch(1);
432 final synchronized void finishReconnect() {
433 Preconditions.checkState(successorLatch != null);
435 if (sealed == SealState.SEALED) {
437 * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current
438 * leftover state to the successor now.
440 flushState(successor);
442 sealed = SealState.FLUSHED;
445 // All done, release the latch, unblocking seal() and canCommit()
446 successorLatch.countDown();
450 * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
451 * and forwarded to the successor connection.
453 * @param request Request to be forwarded
454 * @param callback Original callback
455 * @throws RequestException when the request is unhandled by the successor
457 final synchronized void replayRequest(final TransactionRequest<?> request,
458 final Consumer<Response<?, ?>> callback) {
459 Preconditions.checkState(successor != null, "%s does not have a successor set", this);
461 if (successor instanceof LocalProxyTransaction) {
462 forwardToLocal((LocalProxyTransaction)successor, request, callback);
463 } else if (successor instanceof RemoteProxyTransaction) {
464 forwardToRemote((RemoteProxyTransaction)successor, request, callback);
466 throw new IllegalStateException("Unhandled successor " + successor);
470 abstract void doDelete(final YangInstanceIdentifier path);
472 abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
474 abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
476 abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
478 abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
479 final YangInstanceIdentifier path);
481 abstract void doSeal();
483 abstract void doAbort();
486 abstract void flushState(AbstractProxyTransaction successor);
488 abstract TransactionRequest<?> commitRequest(boolean coordinated);
491 * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
492 * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
493 * operations are packaged in the message.
496 * Note: this method is invoked by the predecessor on the successor.
498 * @param request Request which needs to be forwarded
499 * @param callback Callback to be invoked once the request completes
501 abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
502 @Nullable Consumer<Response<?, ?>> callback);
505 * Replay a request originating in this proxy to a successor remote proxy.
507 abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
508 Consumer<Response<?, ?>> callback);
511 * Replay a request originating in this proxy to a successor local proxy.
513 abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
514 Consumer<Response<?, ?>> callback);