803908d8c603848683d55d74d897c65bd9cd9931
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.ArrayDeque;
18 import java.util.Deque;
19 import java.util.function.Consumer;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
23 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
24 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
25 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
26 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
30 import org.opendaylight.controller.cluster.access.concepts.RequestException;
31 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.mdsal.common.api.ReadFailedException;
35 import org.opendaylight.yangtools.concepts.Identifiable;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Class translating transaction operations towards a particular backend shard.
43  *
44  * <p>
45  * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
46  * transitions coming from interactions with backend are expected to be thread-safe.
47  *
48  * <p>
49  * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
50  * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
51  *
52  * @author Robert Varga
53  */
54 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
55     private static final class IncrementSequence {
56         private long delta = 1;
57
58         long getDelta() {
59             return delta;
60         }
61
62         void incrementDelta() {
63             delta++;
64         }
65     }
66
67     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
68
69     private final Deque<Object> successfulRequests = new ArrayDeque<>();
70     private final ProxyHistory parent;
71
72     private AbstractProxyTransaction successor;
73     private long sequence;
74     private boolean sealed;
75
76     AbstractProxyTransaction(final ProxyHistory parent) {
77         this.parent = Preconditions.checkNotNull(parent);
78     }
79
80     final ActorRef localActor() {
81         return parent.localActor();
82     }
83
84     private void incrementSequence(final long delta) {
85         sequence += delta;
86         LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
87     }
88
89     final long nextSequence() {
90         final long ret = sequence++;
91         LOG.debug("Transaction {} allocated sequence {}", this, ret);
92         return ret;
93     }
94
95     final void delete(final YangInstanceIdentifier path) {
96         checkNotSealed();
97         doDelete(path);
98     }
99
100     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
101         checkNotSealed();
102         doMerge(path, data);
103     }
104
105     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
106         checkNotSealed();
107         doWrite(path, data);
108     }
109
110     final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
111         checkNotSealed();
112         return doExists(path);
113     }
114
115     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
116         checkNotSealed();
117         return doRead(path);
118     }
119
120     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
121         LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
122         parent.sendRequest(request, callback);
123     }
124
125     /**
126      * Seal this transaction before it is either committed or aborted.
127      */
128     final void seal() {
129         checkNotSealed();
130         doSeal();
131         sealed = true;
132         parent.onTransactionSealed(this);
133     }
134
135     private void checkNotSealed() {
136         Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
137     }
138
139     private void checkSealed() {
140         Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
141     }
142
143     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
144         successfulRequests.add(Verify.verifyNotNull(req));
145     }
146
147     final void recordFinishedRequest() {
148         final Object last = successfulRequests.peekLast();
149         if (last instanceof IncrementSequence) {
150             ((IncrementSequence) last).incrementDelta();
151         } else {
152             successfulRequests.addLast(new IncrementSequence());
153         }
154     }
155
156     /**
157      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
158      * being sent to the backend.
159      */
160     final void abort() {
161         checkNotSealed();
162         doAbort();
163         parent.abortTransaction(this);
164     }
165
166     final void abort(final VotingFuture<Void> ret) {
167         checkSealed();
168
169         sendAbort(t -> {
170             if (t instanceof TransactionAbortSuccess) {
171                 ret.voteYes();
172             } else if (t instanceof RequestFailure) {
173                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
174             } else {
175                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
176             }
177
178             // This is a terminal request, hence we do not need to record it
179             LOG.debug("Transaction {} abort completed", this);
180             parent.completeTransaction(this);
181         });
182     }
183
184     final void sendAbort(final Consumer<Response<?, ?>> callback) {
185         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
186     }
187
188     /**
189      * Commit this transaction, possibly in a coordinated fashion.
190      *
191      * @param coordinated True if this transaction should be coordinated across multiple participants.
192      * @return Future completion
193      */
194     final ListenableFuture<Boolean> directCommit() {
195         checkSealed();
196
197         final SettableFuture<Boolean> ret = SettableFuture.create();
198         sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
199             if (t instanceof TransactionCommitSuccess) {
200                 ret.set(Boolean.TRUE);
201             } else if (t instanceof RequestFailure) {
202                 ret.setException(((RequestFailure<?, ?>) t).getCause());
203             } else {
204                 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
205             }
206
207             // This is a terminal request, hence we do not need to record it
208             LOG.debug("Transaction {} directCommit completed", this);
209             parent.completeTransaction(this);
210         });
211         return ret;
212     }
213
214
215     void canCommit(final VotingFuture<?> ret) {
216         checkSealed();
217
218         final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
219         sendRequest(req, t -> {
220             if (t instanceof TransactionCanCommitSuccess) {
221                 ret.voteYes();
222             } else if (t instanceof RequestFailure) {
223                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
224             } else {
225                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
226             }
227
228             recordSuccessfulRequest(req);
229             LOG.debug("Transaction {} canCommit completed", this);
230         });
231     }
232
233     void preCommit(final VotingFuture<?> ret) {
234         checkSealed();
235
236         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
237             localActor());
238         sendRequest(req, t -> {
239             if (t instanceof TransactionPreCommitSuccess) {
240                 ret.voteYes();
241             } else if (t instanceof RequestFailure) {
242                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
243             } else {
244                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
245             }
246
247             recordSuccessfulRequest(req);
248             LOG.debug("Transaction {} preCommit completed", this);
249         });
250     }
251
252     void doCommit(final VotingFuture<?> ret) {
253         checkSealed();
254
255         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
256             if (t instanceof TransactionCommitSuccess) {
257                 ret.voteYes();
258             } else if (t instanceof RequestFailure) {
259                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
260             } else {
261                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
262             }
263
264             LOG.debug("Transaction {} doCommit completed", this);
265             parent.completeTransaction(this);
266         });
267     }
268
269     final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
270         this.successor = Preconditions.checkNotNull(successor);
271
272         for (Object obj : successfulRequests) {
273             if (obj instanceof TransactionRequest) {
274                 LOG.debug("Forwarding request {} to successor {}", obj, successor);
275                 successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
276             } else {
277                 Verify.verify(obj instanceof IncrementSequence);
278                 successor.incrementSequence(((IncrementSequence) obj).getDelta());
279             }
280         }
281         LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
282         successfulRequests.clear();
283     }
284
285     /**
286      * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
287      * and forwarded to the successor connection.
288      *
289      * @param request Request to be forwarded
290      * @param callback Original callback
291      * @throws RequestException when the request is unhandled by the successor
292      */
293     final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
294             throws RequestException {
295         Preconditions.checkState(successor != null, "%s does not have a successor set", this);
296
297         if (successor instanceof LocalProxyTransaction) {
298             forwardToLocal((LocalProxyTransaction)successor, request, callback);
299         } else if (successor instanceof RemoteProxyTransaction) {
300             forwardToRemote((RemoteProxyTransaction)successor, request, callback);
301         } else {
302             throw new IllegalStateException("Unhandled successor " + successor);
303         }
304     }
305
306     abstract void doDelete(final YangInstanceIdentifier path);
307
308     abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
309
310     abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
311
312     abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
313
314     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
315             final YangInstanceIdentifier path);
316
317     abstract void doSeal();
318
319     abstract void doAbort();
320
321     abstract TransactionRequest<?> commitRequest(boolean coordinated);
322
323     /**
324      * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
325      * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
326      * operations are packaged in the message.
327      *
328      * <p>
329      * Note: this method is invoked by the predecessor on the successor.
330      *
331      * @param request Request which needs to be forwarded
332      * @param callback Callback to be invoked once the request completes
333      */
334     abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
335             @Nullable Consumer<Response<?, ?>> callback);
336
337     /**
338      * Replay a request originating in this proxy to a successor remote proxy.
339      */
340     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
341             Consumer<Response<?, ?>> callback) throws RequestException;
342
343     /**
344      * Replay a request originating in this proxy to a successor local proxy.
345      */
346     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
347             Consumer<Response<?, ?>> callback) throws RequestException;
348 }