BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.function.Consumer;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
20 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
21 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
22 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
23 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
24 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.RequestException;
28 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
29 import org.opendaylight.controller.cluster.access.concepts.Response;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.mdsal.common.api.ReadFailedException;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Class translating transaction operations towards a particular backend shard.
40  *
41  * <p>
42  * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
43  * transitions coming from interactions with backend are expected to be thread-safe.
44  *
45  * <p>
46  * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
47  * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
48  *
49  * @author Robert Varga
50  */
51 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
52     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
53
54     private final ProxyHistory parent;
55
56     private AbstractProxyTransaction successor;
57     private long sequence;
58     private boolean sealed;
59
60     AbstractProxyTransaction(final ProxyHistory parent) {
61         this.parent = Preconditions.checkNotNull(parent);
62     }
63
64     final ActorRef localActor() {
65         return parent.localActor();
66     }
67
68     final long nextSequence() {
69         return sequence++;
70     }
71
72     final void delete(final YangInstanceIdentifier path) {
73         checkNotSealed();
74         doDelete(path);
75     }
76
77     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
78         checkNotSealed();
79         doMerge(path, data);
80     }
81
82     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
83         checkNotSealed();
84         doWrite(path, data);
85     }
86
87     final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
88         checkNotSealed();
89         return doExists(path);
90     }
91
92     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
93         checkNotSealed();
94         return doRead(path);
95     }
96
97     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
98         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
99         parent.sendRequest(request, callback);
100     }
101
102     /**
103      * Seal this transaction before it is either committed or aborted.
104      */
105     final void seal() {
106         checkNotSealed();
107         doSeal();
108         sealed = true;
109         parent.onTransactionSealed(this);
110     }
111
112     private void checkNotSealed() {
113         Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
114     }
115
116     private void checkSealed() {
117         Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
118     }
119
120     /**
121      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
122      * being sent to the backend.
123      */
124     final void abort() {
125         checkNotSealed();
126         doAbort();
127         parent.abortTransaction(this);
128     }
129
130     final void abort(final VotingFuture<Void> ret) {
131         checkSealed();
132
133         sendAbort(t -> {
134             if (t instanceof TransactionAbortSuccess) {
135                 ret.voteYes();
136             } else if (t instanceof RequestFailure) {
137                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
138             } else {
139                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
140             }
141
142             parent.completeTransaction(this);
143         });
144     }
145
146     final void sendAbort(final Consumer<Response<?, ?>> callback) {
147         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
148     }
149
150     /**
151      * Commit this transaction, possibly in a coordinated fashion.
152      *
153      * @param coordinated True if this transaction should be coordinated across multiple participants.
154      * @return Future completion
155      */
156     final ListenableFuture<Boolean> directCommit() {
157         checkSealed();
158
159         final SettableFuture<Boolean> ret = SettableFuture.create();
160         sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
161             if (t instanceof TransactionCommitSuccess) {
162                 ret.set(Boolean.TRUE);
163             } else if (t instanceof RequestFailure) {
164                 ret.setException(((RequestFailure<?, ?>) t).getCause());
165             } else {
166                 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
167             }
168
169             parent.completeTransaction(this);
170         });
171         return ret;
172     }
173
174
175     void canCommit(final VotingFuture<?> ret) {
176         checkSealed();
177
178         sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
179             if (t instanceof TransactionCanCommitSuccess) {
180                 ret.voteYes();
181             } else if (t instanceof RequestFailure) {
182                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
183             } else {
184                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
185             }
186         });
187     }
188
189     void preCommit(final VotingFuture<?> ret) {
190         checkSealed();
191
192         sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
193             if (t instanceof TransactionPreCommitSuccess) {
194                 ret.voteYes();
195             } else if (t instanceof RequestFailure) {
196                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
197             } else {
198                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
199             }
200         });
201     }
202
203     void doCommit(final VotingFuture<?> ret) {
204         checkSealed();
205
206         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
207             if (t instanceof TransactionCommitSuccess) {
208                 ret.voteYes();
209             } else if (t instanceof RequestFailure) {
210                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
211             } else {
212                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
213             }
214
215             parent.completeTransaction(this);
216         });
217     }
218
219     void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
220         this.successor = Preconditions.checkNotNull(successor);
221     }
222
223     /**
224      * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
225      * and forwarded to the successor connection.
226      *
227      * @param request Request to be forwarded
228      * @param callback Original callback
229      * @throws RequestException when the request is unhandled by the successor
230      */
231     final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
232             throws RequestException {
233         Preconditions.checkState(successor != null, "%s does not have a successor set", this);
234
235         if (successor instanceof LocalProxyTransaction) {
236             forwardToLocal((LocalProxyTransaction)successor, request, callback);
237         } else if (successor instanceof RemoteProxyTransaction) {
238             forwardToRemote((RemoteProxyTransaction)successor, request, callback);
239         } else {
240             throw new IllegalStateException("Unhandled successor " + successor);
241         }
242     }
243
244     abstract void doDelete(final YangInstanceIdentifier path);
245
246     abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
247
248     abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
249
250     abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
251
252     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
253             final YangInstanceIdentifier path);
254
255     abstract void doSeal();
256
257     abstract void doAbort();
258
259     abstract TransactionRequest<?> commitRequest(boolean coordinated);
260
261     /**
262      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
263      * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
264      * operations are packaged in the message.
265      *
266      * <p>
267      * Note: this method is invoked by the predecessor on the successor.
268      *
269      * @param request Request which needs to be forwarded
270      * @param callback Callback to be invoked once the request completes
271      */
272     abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
273             @Nullable Consumer<Response<?, ?>> callback);
274
275     /**
276      * Replay a request originating in this proxy to a successor remote proxy.
277      */
278     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
279             Consumer<Response<?, ?>> callback) throws RequestException;
280
281     /**
282      * Replay a request originating in this proxy to a successor local proxy.
283      */
284     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
285             Consumer<Response<?, ?>> callback) throws RequestException;
286 }