2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.function.Consumer;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
20 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
21 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
22 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
23 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
24 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.RequestException;
28 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
29 import org.opendaylight.controller.cluster.access.concepts.Response;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.mdsal.common.api.ReadFailedException;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Class translating transaction operations towards a particular backend shard.
42 * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
43 * transitions coming from interactions with backend are expected to be thread-safe.
46 * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
47 * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
49 * @author Robert Varga
51 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
52 private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
54 private final ProxyHistory parent;
56 private AbstractProxyTransaction successor;
57 private long sequence;
58 private boolean sealed;
60 AbstractProxyTransaction(final ProxyHistory parent) {
61 this.parent = Preconditions.checkNotNull(parent);
64 final ActorRef localActor() {
65 return parent.localActor();
68 final long nextSequence() {
72 final void delete(final YangInstanceIdentifier path) {
77 final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
82 final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
87 final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
89 return doExists(path);
92 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
97 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
98 LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
99 parent.sendRequest(request, callback);
103 * Seal this transaction before it is either committed or aborted.
109 parent.onTransactionSealed(this);
112 private void checkNotSealed() {
113 Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
116 private void checkSealed() {
117 Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
121 * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
122 * being sent to the backend.
127 parent.abortTransaction(this);
130 final void abort(final VotingFuture<Void> ret) {
134 if (t instanceof TransactionAbortSuccess) {
136 } else if (t instanceof RequestFailure) {
137 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
139 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
142 parent.completeTransaction(this);
146 final void sendAbort(final Consumer<Response<?, ?>> callback) {
147 sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
151 * Commit this transaction, possibly in a coordinated fashion.
153 * @param coordinated True if this transaction should be coordinated across multiple participants.
154 * @return Future completion
156 final ListenableFuture<Boolean> directCommit() {
159 final SettableFuture<Boolean> ret = SettableFuture.create();
160 sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
161 if (t instanceof TransactionCommitSuccess) {
162 ret.set(Boolean.TRUE);
163 } else if (t instanceof RequestFailure) {
164 ret.setException(((RequestFailure<?, ?>) t).getCause());
166 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
169 parent.completeTransaction(this);
175 void canCommit(final VotingFuture<?> ret) {
178 sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
179 if (t instanceof TransactionCanCommitSuccess) {
181 } else if (t instanceof RequestFailure) {
182 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
184 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
189 void preCommit(final VotingFuture<?> ret) {
192 sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
193 if (t instanceof TransactionPreCommitSuccess) {
195 } else if (t instanceof RequestFailure) {
196 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
198 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
203 void doCommit(final VotingFuture<?> ret) {
206 sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
207 if (t instanceof TransactionCommitSuccess) {
209 } else if (t instanceof RequestFailure) {
210 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
212 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
215 parent.completeTransaction(this);
219 void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
220 this.successor = Preconditions.checkNotNull(successor);
224 * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
225 * and forwarded to the successor connection.
227 * @param request Request to be forwarded
228 * @param callback Original callback
229 * @throws RequestException when the request is unhandled by the successor
231 final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
232 throws RequestException {
233 Preconditions.checkState(successor != null, "%s does not have a successor set", this);
235 if (successor instanceof LocalProxyTransaction) {
236 forwardToLocal((LocalProxyTransaction)successor, request, callback);
237 } else if (successor instanceof RemoteProxyTransaction) {
238 forwardToRemote((RemoteProxyTransaction)successor, request, callback);
240 throw new IllegalStateException("Unhandled successor " + successor);
244 abstract void doDelete(final YangInstanceIdentifier path);
246 abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
248 abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
250 abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
252 abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
253 final YangInstanceIdentifier path);
255 abstract void doSeal();
257 abstract void doAbort();
259 abstract TransactionRequest<?> commitRequest(boolean coordinated);
262 * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
263 * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
264 * operations are packaged in the message.
267 * Note: this method is invoked by the predecessor on the successor.
269 * @param request Request which needs to be forwarded
270 * @param callback Callback to be invoked once the request completes
272 abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
273 @Nullable Consumer<Response<?, ?>> callback);
276 * Replay a request originating in this proxy to a successor remote proxy.
278 abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
279 Consumer<Response<?, ?>> callback) throws RequestException;
282 * Replay a request originating in this proxy to a successor local proxy.
284 abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
285 Consumer<Response<?, ?>> callback) throws RequestException;