2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.ArrayDeque;
18 import java.util.Deque;
19 import java.util.function.Consumer;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
23 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
24 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
25 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
26 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
30 import org.opendaylight.controller.cluster.access.concepts.RequestException;
31 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.mdsal.common.api.ReadFailedException;
35 import org.opendaylight.yangtools.concepts.Identifiable;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Class translating transaction operations towards a particular backend shard.
45 * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
46 * transitions coming from interactions with backend are expected to be thread-safe.
49 * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
50 * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
52 * @author Robert Varga
54 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
55 private static final class IncrementSequence {
56 private long delta = 1;
62 void incrementDelta() {
67 private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
69 private final Deque<Object> successfulRequests = new ArrayDeque<>();
70 private final ProxyHistory parent;
72 private AbstractProxyTransaction successor;
73 private long sequence;
74 private boolean sealed;
76 AbstractProxyTransaction(final ProxyHistory parent) {
77 this.parent = Preconditions.checkNotNull(parent);
80 final ActorRef localActor() {
81 return parent.localActor();
84 private void incrementSequence(final long delta) {
86 LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
89 final long nextSequence() {
90 final long ret = sequence++;
91 LOG.debug("Transaction {} allocated sequence {}", this, ret);
95 final void delete(final YangInstanceIdentifier path) {
100 final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
105 final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
110 final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
112 return doExists(path);
115 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
120 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
121 LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
122 parent.sendRequest(request, callback);
126 * Seal this transaction before it is either committed or aborted.
132 parent.onTransactionSealed(this);
135 private void checkNotSealed() {
136 Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
139 private void checkSealed() {
140 Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
143 final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
144 successfulRequests.add(Verify.verifyNotNull(req));
147 final void recordFinishedRequest() {
148 final Object last = successfulRequests.peekLast();
149 if (last instanceof IncrementSequence) {
150 ((IncrementSequence) last).incrementDelta();
152 successfulRequests.addLast(new IncrementSequence());
157 * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
158 * being sent to the backend.
163 parent.abortTransaction(this);
166 final void abort(final VotingFuture<Void> ret) {
170 if (t instanceof TransactionAbortSuccess) {
172 } else if (t instanceof RequestFailure) {
173 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
175 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
178 // This is a terminal request, hence we do not need to record it
179 LOG.debug("Transaction {} abort completed", this);
180 parent.completeTransaction(this);
184 final void sendAbort(final Consumer<Response<?, ?>> callback) {
185 sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
189 * Commit this transaction, possibly in a coordinated fashion.
191 * @param coordinated True if this transaction should be coordinated across multiple participants.
192 * @return Future completion
194 final ListenableFuture<Boolean> directCommit() {
197 final SettableFuture<Boolean> ret = SettableFuture.create();
198 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
199 if (t instanceof TransactionCommitSuccess) {
200 ret.set(Boolean.TRUE);
201 } else if (t instanceof RequestFailure) {
202 ret.setException(((RequestFailure<?, ?>) t).getCause());
204 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
207 // This is a terminal request, hence we do not need to record it
208 LOG.debug("Transaction {} directCommit completed", this);
209 parent.completeTransaction(this);
215 void canCommit(final VotingFuture<?> ret) {
218 final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
219 sendRequest(req, t -> {
220 if (t instanceof TransactionCanCommitSuccess) {
222 } else if (t instanceof RequestFailure) {
223 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
225 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
228 recordSuccessfulRequest(req);
229 LOG.debug("Transaction {} canCommit completed", this);
233 void preCommit(final VotingFuture<?> ret) {
236 final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
238 sendRequest(req, t -> {
239 if (t instanceof TransactionPreCommitSuccess) {
241 } else if (t instanceof RequestFailure) {
242 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
244 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
247 recordSuccessfulRequest(req);
248 LOG.debug("Transaction {} preCommit completed", this);
252 void doCommit(final VotingFuture<?> ret) {
255 sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
256 if (t instanceof TransactionCommitSuccess) {
258 } else if (t instanceof RequestFailure) {
259 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
261 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
264 LOG.debug("Transaction {} doCommit completed", this);
265 parent.completeTransaction(this);
269 final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
270 this.successor = Preconditions.checkNotNull(successor);
272 for (Object obj : successfulRequests) {
273 if (obj instanceof TransactionRequest) {
274 LOG.debug("Forwarding request {} to successor {}", obj, successor);
275 successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
277 Verify.verify(obj instanceof IncrementSequence);
278 successor.incrementSequence(((IncrementSequence) obj).getDelta());
281 LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
282 successfulRequests.clear();
286 * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
287 * and forwarded to the successor connection.
289 * @param request Request to be forwarded
290 * @param callback Original callback
291 * @throws RequestException when the request is unhandled by the successor
293 final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
294 throws RequestException {
295 Preconditions.checkState(successor != null, "%s does not have a successor set", this);
297 if (successor instanceof LocalProxyTransaction) {
298 forwardToLocal((LocalProxyTransaction)successor, request, callback);
299 } else if (successor instanceof RemoteProxyTransaction) {
300 forwardToRemote((RemoteProxyTransaction)successor, request, callback);
302 throw new IllegalStateException("Unhandled successor " + successor);
306 abstract void doDelete(final YangInstanceIdentifier path);
308 abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
310 abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
312 abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
314 abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
315 final YangInstanceIdentifier path);
317 abstract void doSeal();
319 abstract void doAbort();
321 abstract TransactionRequest<?> commitRequest(boolean coordinated);
324 * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
325 * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
326 * operations are packaged in the message.
329 * Note: this method is invoked by the predecessor on the successor.
331 * @param request Request which needs to be forwarded
332 * @param callback Callback to be invoked once the request completes
334 abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
335 @Nullable Consumer<Response<?, ?>> callback);
338 * Replay a request originating in this proxy to a successor remote proxy.
340 abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
341 Consumer<Response<?, ?>> callback) throws RequestException;
344 * Replay a request originating in this proxy to a successor local proxy.
346 abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
347 Consumer<Response<?, ?>> callback) throws RequestException;