Bump odlparent to 6.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.concurrent.Executor;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
27 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
28 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
29 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34
35 /**
36  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
37  * decapsulating DataTreeChanged messages and dispatching their context to the user.
38  */
39 final class DataTreeCohortActor extends AbstractUntypedActor {
40     private final Idle idleState = new Idle();
41     private final DOMDataTreeCommitCohort cohort;
42     private final YangInstanceIdentifier registeredPath;
43     private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
44
45     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
46         this.cohort = Objects.requireNonNull(cohort);
47         this.registeredPath = Objects.requireNonNull(registeredPath);
48     }
49
50     @Override
51     protected void handleReceive(final Object message) {
52         if (!(message instanceof CommitProtocolCommand)) {
53             unknownMessage(message);
54             return;
55         }
56
57         CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
58         CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
59
60         LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
61                 currentState, message);
62
63         currentState.handle(command);
64     }
65
66     /**
67      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
68      *
69      * @param <R> Reply message type
70      */
71     abstract static class CommitProtocolCommand<R extends CommitReply> {
72
73         private final TransactionIdentifier txId;
74
75         final TransactionIdentifier getTxId() {
76             return txId;
77         }
78
79         protected CommitProtocolCommand(final TransactionIdentifier txId) {
80             this.txId = Objects.requireNonNull(txId);
81         }
82
83         @Override
84         public String toString() {
85             return getClass().getSimpleName() + " [txId=" + txId + "]";
86         }
87     }
88
89     static final class CanCommit extends CommitProtocolCommand<Success> {
90
91         private final Collection<DOMDataTreeCandidate> candidates;
92         private final ActorRef cohort;
93         private final SchemaContext schema;
94
95         CanCommit(final TransactionIdentifier txId, final Collection<DOMDataTreeCandidate> candidates,
96                 final SchemaContext schema, final ActorRef cohort) {
97             super(txId);
98             this.cohort = Objects.requireNonNull(cohort);
99             this.candidates = Objects.requireNonNull(candidates);
100             this.schema = Objects.requireNonNull(schema);
101         }
102
103         Collection<DOMDataTreeCandidate> getCandidates() {
104             return candidates;
105         }
106
107         SchemaContext getSchema() {
108             return schema;
109         }
110
111         ActorRef getCohort() {
112             return cohort;
113         }
114
115         @Override
116         public String toString() {
117             return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort  + "]";
118         }
119     }
120
121     abstract static class CommitReply {
122
123         private final ActorRef cohortRef;
124         private final TransactionIdentifier txId;
125
126         protected CommitReply(final ActorRef cohortRef, final TransactionIdentifier txId) {
127             this.cohortRef = Objects.requireNonNull(cohortRef);
128             this.txId = Objects.requireNonNull(txId);
129         }
130
131         ActorRef getCohort() {
132             return cohortRef;
133         }
134
135         final TransactionIdentifier getTxId() {
136             return txId;
137         }
138
139         @Override
140         public String toString() {
141             return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
142         }
143     }
144
145     static final class Success extends CommitReply {
146
147         Success(final ActorRef cohortRef, final TransactionIdentifier txId) {
148             super(cohortRef, txId);
149         }
150     }
151
152     static final class PreCommit extends CommitProtocolCommand<Success> {
153
154         PreCommit(final TransactionIdentifier txId) {
155             super(txId);
156         }
157     }
158
159     static final class Abort extends CommitProtocolCommand<Success> {
160
161         Abort(final TransactionIdentifier txId) {
162             super(txId);
163         }
164     }
165
166     static final class Commit extends CommitProtocolCommand<Success> {
167
168         Commit(final TransactionIdentifier txId) {
169             super(txId);
170         }
171     }
172
173     private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
174         private final Class<M> handledMessageType;
175
176         CohortBehaviour(final Class<M> handledMessageType) {
177             this.handledMessageType = Objects.requireNonNull(handledMessageType);
178         }
179
180         void handle(final CommitProtocolCommand<?> command) {
181             if (handledMessageType.isInstance(command)) {
182                 onMessage(command);
183             } else if (command instanceof Abort) {
184                 onAbort(((Abort)command).getTxId());
185             } else {
186                 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
187                         "Unexpected message %s in cohort behavior %s", command.getClass(),
188                         getClass().getSimpleName()))), getSelf());
189             }
190         }
191
192         private void onMessage(final CommitProtocolCommand<?> message) {
193             final ActorRef sender = getSender();
194             TransactionIdentifier txId = message.getTxId();
195             ListenableFuture<S> future = process(handledMessageType.cast(message));
196             Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
197                     : DataTreeCohortActor.this::executeInSelf;
198             Futures.addCallback(future, new FutureCallback<S>() {
199                 @Override
200                 public void onSuccess(final S nextStep) {
201                     success(txId, sender, nextStep);
202                 }
203
204                 @Override
205                 public void onFailure(final Throwable failure) {
206                     failed(txId, sender, failure);
207                 }
208             }, callbackExecutor);
209         }
210
211         @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
212                 justification = "https://github.com/spotbugs/spotbugs/issues/811")
213         private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
214             currentStateMap.remove(txId);
215             sender.tell(new Status.Failure(failure), getSelf());
216         }
217
218         @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
219                 justification = "https://github.com/spotbugs/spotbugs/issues/811")
220         private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
221             currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
222             sender.tell(new Success(getSelf(), txId), getSelf());
223         }
224
225         private void onAbort(final TransactionIdentifier txId) {
226             currentStateMap.remove(txId);
227             final ActorRef sender = getSender();
228             Futures.addCallback(abort(), new FutureCallback<Object>() {
229                 @Override
230                 public void onSuccess(final Object noop) {
231                     sender.tell(new Success(getSelf(), txId), getSelf());
232                 }
233
234                 @Override
235                 public void onFailure(final Throwable failure) {
236                     LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
237                     sender.tell(new Status.Failure(failure), getSelf());
238                 }
239             }, MoreExecutors.directExecutor());
240         }
241
242         abstract @Nullable CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
243
244         abstract @NonNull ListenableFuture<S> process(M command);
245
246         abstract ListenableFuture<?> abort();
247
248         @Override
249         public String toString() {
250             return getClass().getSimpleName();
251         }
252     }
253
254     private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
255         Idle() {
256             super(CanCommit.class);
257         }
258
259         @Override
260         ListenableFuture<PostCanCommitStep> process(final CanCommit message) {
261             return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
262         }
263
264         @Override
265         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
266             return new PostCanCommit(txId, nextStep);
267         }
268
269         @Override
270         ListenableFuture<?> abort() {
271             return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
272         }
273     }
274
275     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
276             N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
277         private final S step;
278         private final TransactionIdentifier txId;
279
280         CohortStateWithStep(final Class<M> handledMessageType, final TransactionIdentifier txId, final S step) {
281             super(handledMessageType);
282             this.txId = Objects.requireNonNull(txId);
283             this.step = Objects.requireNonNull(step);
284         }
285
286         final S getStep() {
287             return step;
288         }
289
290         @Override
291         ListenableFuture<?> abort() {
292             return getStep().abort();
293         }
294
295         @Override
296         public String toString() {
297             return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
298         }
299     }
300
301     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
302
303         PostCanCommit(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
304             super(PreCommit.class, txId, nextStep);
305         }
306
307         @SuppressWarnings("unchecked")
308         @Override
309         ListenableFuture<PostPreCommitStep> process(final PreCommit message) {
310             return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
311         }
312
313         @Override
314         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostPreCommitStep nextStep) {
315             return new PostPreCommit(txId, nextStep);
316         }
317
318     }
319
320     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
321
322         PostPreCommit(final TransactionIdentifier txId, final PostPreCommitStep step) {
323             super(Commit.class, txId, step);
324         }
325
326         @SuppressWarnings("unchecked")
327         @Override
328         ListenableFuture<NoopThreePhaseCommitStep> process(final Commit message) {
329             return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
330         }
331
332         @Override
333         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final NoopThreePhaseCommitStep nextStep) {
334             return null;
335         }
336     }
337
338     private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
339     }
340
341     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
342         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
343     }
344 }