8ff8b8eff9cb5850d1c7a8bf5c39e408627355a4
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.function.Consumer;
18 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
19 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
20 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
21 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
22 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
23 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
24 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
25 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
26 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
27 import org.opendaylight.controller.cluster.access.concepts.Response;
28 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.yangtools.concepts.Identifiable;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33
34 /**
35  * Class translating transaction operations towards a particular backend shard.
36  *
37  * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
38  * transitions coming from interactions with backend are expected to be thread-safe.
39  *
40  * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
41  * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
42  *
43  * @author Robert Varga
44  */
45 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
46     private final DistributedDataStoreClientBehavior client;
47
48     private long sequence;
49     private boolean sealed;
50
51     AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) {
52         this.client = Preconditions.checkNotNull(client);
53     }
54
55     final ActorRef localActor() {
56         return client.self();
57     }
58
59     final long nextSequence() {
60         return sequence++;
61     }
62
63     final void delete(final YangInstanceIdentifier path) {
64         checkSealed();
65         doDelete(path);
66     }
67
68     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
69         checkSealed();
70         doMerge(path, data);
71     }
72
73     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
74         checkSealed();
75         doWrite(path, data);
76     }
77
78     final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
79         checkSealed();
80         return doExists(path);
81     }
82
83     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
84         checkSealed();
85         return doRead(path);
86     }
87
88     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
89         client.sendRequest(request, completer);
90     }
91
92     /**
93      * Seal this transaction before it is either
94      */
95     final void seal() {
96         checkSealed();
97         doSeal();
98         sealed = true;
99     }
100
101     private void checkSealed() {
102         Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
103     }
104
105     /**
106      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
107      * being sent to the backend.
108      */
109     final void abort() {
110         checkSealed();
111         doAbort();
112     }
113
114     /**
115      * Commit this transaction, possibly in a coordinated fashion.
116      *
117      * @param coordinated True if this transaction should be coordinated across multiple participants.
118      * @return Future completion
119      */
120     final ListenableFuture<Boolean> directCommit() {
121         checkSealed();
122
123         final SettableFuture<Boolean> ret = SettableFuture.create();
124         sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
125             if (t instanceof TransactionCommitSuccess) {
126                 ret.set(Boolean.TRUE);
127             } else if (t instanceof RequestFailure) {
128                 ret.setException(((RequestFailure<?, ?>) t).getCause());
129             } else {
130                 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
131             }
132         });
133         return ret;
134     }
135
136     void abort(final VotingFuture<Void> ret) {
137         checkSealed();
138
139         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
140             if (t instanceof TransactionAbortSuccess) {
141                 ret.voteYes();
142             } else if (t instanceof RequestFailure) {
143                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
144             } else {
145                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
146             }
147         });
148     }
149
150     void canCommit(final VotingFuture<?> ret) {
151         checkSealed();
152
153         sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
154             if (t instanceof TransactionCanCommitSuccess) {
155                 ret.voteYes();
156             } else if (t instanceof RequestFailure) {
157                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
158             } else {
159                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
160             }
161         });
162     }
163
164     void preCommit(final VotingFuture<?> ret) {
165         checkSealed();
166
167         sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
168             if (t instanceof TransactionPreCommitSuccess) {
169                 ret.voteYes();
170             } else if (t instanceof RequestFailure) {
171                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
172             } else {
173                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
174             }
175         });
176     }
177
178     void doCommit(final VotingFuture<?> ret) {
179         checkSealed();
180
181         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
182             if (t instanceof TransactionCommitSuccess) {
183                 ret.voteYes();
184             } else if (t instanceof RequestFailure) {
185                 ret.voteNo(((RequestFailure<?, ?>) t).getCause());
186             } else {
187                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
188             }
189         });
190     }
191
192     abstract void doDelete(final YangInstanceIdentifier path);
193
194     abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
195
196     abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
197
198     abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
199
200     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path);
201
202     abstract void doSeal();
203
204     abstract void doAbort();
205
206     abstract TransactionRequest<?> doCommit(boolean coordinated);
207 }