Adjust to DOMDataTreeCommitCohort API change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.concurrent.Executor;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
27 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
28 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
29 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34
35 /**
36  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
37  * decapsulating DataTreeChanged messages and dispatching their context to the user.
38  */
39 final class DataTreeCohortActor extends AbstractUntypedActor {
40     private final Idle idleState = new Idle();
41     private final DOMDataTreeCommitCohort cohort;
42     private final YangInstanceIdentifier registeredPath;
43     private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
44
45     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
46         this.cohort = Objects.requireNonNull(cohort);
47         this.registeredPath = Objects.requireNonNull(registeredPath);
48     }
49
50     @Override
51     protected void handleReceive(final Object message) {
52         if (!(message instanceof CommitProtocolCommand)) {
53             unknownMessage(message);
54             return;
55         }
56
57         CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
58         CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
59
60         LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
61                 currentState, message);
62
63         currentState.handle(command);
64     }
65
66     /**
67      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
68      *
69      * @param <R> Reply message type
70      */
71     abstract static class CommitProtocolCommand<R extends CommitReply> {
72
73         private final TransactionIdentifier txId;
74
75         final TransactionIdentifier getTxId() {
76             return txId;
77         }
78
79         protected CommitProtocolCommand(TransactionIdentifier txId) {
80             this.txId = Objects.requireNonNull(txId);
81         }
82
83         @Override
84         public String toString() {
85             return getClass().getSimpleName() + " [txId=" + txId + "]";
86         }
87     }
88
89     static final class CanCommit extends CommitProtocolCommand<Success> {
90
91         private final Collection<DOMDataTreeCandidate> candidates;
92         private final ActorRef cohort;
93         private final SchemaContext schema;
94
95         CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
96                 ActorRef cohort) {
97             super(txId);
98             this.cohort = Objects.requireNonNull(cohort);
99             this.candidates = Objects.requireNonNull(candidates);
100             this.schema = Objects.requireNonNull(schema);
101         }
102
103         Collection<DOMDataTreeCandidate> getCandidates() {
104             return candidates;
105         }
106
107         SchemaContext getSchema() {
108             return schema;
109         }
110
111         ActorRef getCohort() {
112             return cohort;
113         }
114
115         @Override
116         public String toString() {
117             return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort  + "]";
118         }
119     }
120
121     abstract static class CommitReply {
122
123         private final ActorRef cohortRef;
124         private final TransactionIdentifier txId;
125
126         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
127             this.cohortRef = Objects.requireNonNull(cohortRef);
128             this.txId = Objects.requireNonNull(txId);
129         }
130
131         ActorRef getCohort() {
132             return cohortRef;
133         }
134
135         final TransactionIdentifier getTxId() {
136             return txId;
137         }
138
139         @Override
140         public String toString() {
141             return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
142         }
143     }
144
145     static final class Success extends CommitReply {
146
147         Success(ActorRef cohortRef, TransactionIdentifier txId) {
148             super(cohortRef, txId);
149         }
150     }
151
152     static final class PreCommit extends CommitProtocolCommand<Success> {
153
154         PreCommit(TransactionIdentifier txId) {
155             super(txId);
156         }
157     }
158
159     static final class Abort extends CommitProtocolCommand<Success> {
160
161         Abort(TransactionIdentifier txId) {
162             super(txId);
163         }
164     }
165
166     static final class Commit extends CommitProtocolCommand<Success> {
167
168         Commit(TransactionIdentifier txId) {
169             super(txId);
170         }
171     }
172
173     private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
174         private final Class<M> handledMessageType;
175
176         CohortBehaviour(Class<M> handledMessageType) {
177             this.handledMessageType = Objects.requireNonNull(handledMessageType);
178         }
179
180         void handle(CommitProtocolCommand<?> command) {
181             if (handledMessageType.isInstance(command)) {
182                 onMessage(command);
183             } else if (command instanceof Abort) {
184                 onAbort(((Abort)command).getTxId());
185             } else {
186                 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
187                         "Unexpected message %s in cohort behavior %s", command.getClass(),
188                         getClass().getSimpleName()))), getSelf());
189             }
190         }
191
192         private void onMessage(CommitProtocolCommand<?> message) {
193             final ActorRef sender = getSender();
194             TransactionIdentifier txId = message.getTxId();
195             ListenableFuture<S> future = process(handledMessageType.cast(message));
196             Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
197                     : DataTreeCohortActor.this::executeInSelf;
198             Futures.addCallback(future, new FutureCallback<S>() {
199                 @Override
200                 public void onSuccess(S nextStep) {
201                     success(txId, sender, nextStep);
202                 }
203
204                 @Override
205                 public void onFailure(Throwable failure) {
206                     failed(txId, sender, failure);
207                 }
208             }, callbackExecutor);
209         }
210
211         private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
212             currentStateMap.remove(txId);
213             sender.tell(new Status.Failure(failure), getSelf());
214         }
215
216         private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
217             currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
218             sender.tell(new Success(getSelf(), txId), getSelf());
219         }
220
221         private void onAbort(TransactionIdentifier txId) {
222             currentStateMap.remove(txId);
223             final ActorRef sender = getSender();
224             Futures.addCallback(abort(), new FutureCallback<Object>() {
225                 @Override
226                 public void onSuccess(Object noop) {
227                     sender.tell(new Success(getSelf(), txId), getSelf());
228                 }
229
230                 @Override
231                 public void onFailure(Throwable failure) {
232                     LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
233                     sender.tell(new Status.Failure(failure), getSelf());
234                 }
235             }, MoreExecutors.directExecutor());
236         }
237
238         @Nullable
239         abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
240
241         @Nonnull
242         abstract ListenableFuture<S> process(M command);
243
244         abstract ListenableFuture<?> abort();
245
246         @Override
247         public String toString() {
248             return getClass().getSimpleName();
249         }
250     }
251
252     private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
253         Idle() {
254             super(CanCommit.class);
255         }
256
257         @Override
258         ListenableFuture<PostCanCommitStep> process(CanCommit message) {
259             return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
260         }
261
262         @Override
263         CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) {
264             return new PostCanCommit(txId, nextStep);
265         }
266
267         @Override
268         ListenableFuture<?> abort() {
269             return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
270         }
271     }
272
273     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
274             N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
275         private final S step;
276         private final TransactionIdentifier txId;
277
278         CohortStateWithStep(Class<M> handledMessageType, TransactionIdentifier txId, S step) {
279             super(handledMessageType);
280             this.txId = Objects.requireNonNull(txId);
281             this.step = Objects.requireNonNull(step);
282         }
283
284         final S getStep() {
285             return step;
286         }
287
288         @Override
289         ListenableFuture<?> abort() {
290             return getStep().abort();
291         }
292
293         @Override
294         public String toString() {
295             return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
296         }
297     }
298
299     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
300
301         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
302             super(PreCommit.class, txId, nextStep);
303         }
304
305         @SuppressWarnings("unchecked")
306         @Override
307         ListenableFuture<PostPreCommitStep> process(PreCommit message) {
308             return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
309         }
310
311         @Override
312         CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) {
313             return new PostPreCommit(txId, nextStep);
314         }
315
316     }
317
318     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
319
320         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
321             super(Commit.class, txId, step);
322         }
323
324         @SuppressWarnings("unchecked")
325         @Override
326         ListenableFuture<NoopThreePhaseCommitStep> process(Commit message) {
327             return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
328         }
329
330         @Override
331         CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) {
332             return null;
333         }
334     }
335
336     private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
337     }
338
339     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
340         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
341     }
342 }