Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.concurrent.Executor;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
27 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
28 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
33
34 /**
35  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
36  * decapsulating DataTreeChanged messages and dispatching their context to the user.
37  */
38 final class DataTreeCohortActor extends AbstractUntypedActor {
39     private final Idle idleState = new Idle();
40     private final DOMDataTreeCommitCohort cohort;
41     private final YangInstanceIdentifier registeredPath;
42     private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
43
44     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
45         this.cohort = Objects.requireNonNull(cohort);
46         this.registeredPath = Objects.requireNonNull(registeredPath);
47     }
48
49     @Override
50     protected void handleReceive(final Object message) {
51         if (!(message instanceof CommitProtocolCommand)) {
52             unknownMessage(message);
53             return;
54         }
55
56         CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
57         CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
58
59         LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
60                 currentState, message);
61
62         currentState.handle(command);
63     }
64
65     /**
66      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
67      *
68      * @param <R> Reply message type
69      */
70     abstract static class CommitProtocolCommand<R extends CommitReply> {
71
72         private final TransactionIdentifier txId;
73
74         final TransactionIdentifier getTxId() {
75             return txId;
76         }
77
78         protected CommitProtocolCommand(final TransactionIdentifier txId) {
79             this.txId = Objects.requireNonNull(txId);
80         }
81
82         @Override
83         public String toString() {
84             return getClass().getSimpleName() + " [txId=" + txId + "]";
85         }
86     }
87
88     static final class CanCommit extends CommitProtocolCommand<Success> {
89
90         private final Collection<DOMDataTreeCandidate> candidates;
91         private final ActorRef cohort;
92         private final EffectiveModelContext schema;
93
94         CanCommit(final TransactionIdentifier txId, final Collection<DOMDataTreeCandidate> candidates,
95                 final EffectiveModelContext schema, final ActorRef cohort) {
96             super(txId);
97             this.cohort = Objects.requireNonNull(cohort);
98             this.candidates = Objects.requireNonNull(candidates);
99             this.schema = Objects.requireNonNull(schema);
100         }
101
102         Collection<DOMDataTreeCandidate> getCandidates() {
103             return candidates;
104         }
105
106         EffectiveModelContext getSchema() {
107             return schema;
108         }
109
110         ActorRef getCohort() {
111             return cohort;
112         }
113
114         @Override
115         public String toString() {
116             return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort  + "]";
117         }
118     }
119
120     abstract static class CommitReply {
121
122         private final ActorRef cohortRef;
123         private final TransactionIdentifier txId;
124
125         protected CommitReply(final ActorRef cohortRef, final TransactionIdentifier txId) {
126             this.cohortRef = Objects.requireNonNull(cohortRef);
127             this.txId = Objects.requireNonNull(txId);
128         }
129
130         ActorRef getCohort() {
131             return cohortRef;
132         }
133
134         final TransactionIdentifier getTxId() {
135             return txId;
136         }
137
138         @Override
139         public String toString() {
140             return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
141         }
142     }
143
144     static final class Success extends CommitReply {
145
146         Success(final ActorRef cohortRef, final TransactionIdentifier txId) {
147             super(cohortRef, txId);
148         }
149     }
150
151     static final class PreCommit extends CommitProtocolCommand<Success> {
152
153         PreCommit(final TransactionIdentifier txId) {
154             super(txId);
155         }
156     }
157
158     static final class Abort extends CommitProtocolCommand<Success> {
159
160         Abort(final TransactionIdentifier txId) {
161             super(txId);
162         }
163     }
164
165     static final class Commit extends CommitProtocolCommand<Success> {
166
167         Commit(final TransactionIdentifier txId) {
168             super(txId);
169         }
170     }
171
172     private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
173         private final Class<M> handledMessageType;
174
175         CohortBehaviour(final Class<M> handledMessageType) {
176             this.handledMessageType = Objects.requireNonNull(handledMessageType);
177         }
178
179         void handle(final CommitProtocolCommand<?> command) {
180             if (handledMessageType.isInstance(command)) {
181                 onMessage(command);
182             } else if (command instanceof Abort) {
183                 onAbort(((Abort)command).getTxId());
184             } else {
185                 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
186                         "Unexpected message %s in cohort behavior %s", command.getClass(),
187                         getClass().getSimpleName()))), getSelf());
188             }
189         }
190
191         private void onMessage(final CommitProtocolCommand<?> message) {
192             final ActorRef sender = getSender();
193             TransactionIdentifier txId = message.getTxId();
194             ListenableFuture<S> future = process(handledMessageType.cast(message));
195             Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
196                     : DataTreeCohortActor.this::executeInSelf;
197             Futures.addCallback(future, new FutureCallback<S>() {
198                 @Override
199                 public void onSuccess(final S nextStep) {
200                     success(txId, sender, nextStep);
201                 }
202
203                 @Override
204                 public void onFailure(final Throwable failure) {
205                     failed(txId, sender, failure);
206                 }
207             }, callbackExecutor);
208         }
209
210         private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
211             currentStateMap.remove(txId);
212             sender.tell(new Status.Failure(failure), getSelf());
213         }
214
215         private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
216             currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
217             sender.tell(new Success(getSelf(), txId), getSelf());
218         }
219
220         private void onAbort(final TransactionIdentifier txId) {
221             currentStateMap.remove(txId);
222             final ActorRef sender = getSender();
223             Futures.addCallback(abort(), new FutureCallback<Object>() {
224                 @Override
225                 public void onSuccess(final Object noop) {
226                     sender.tell(new Success(getSelf(), txId), getSelf());
227                 }
228
229                 @Override
230                 public void onFailure(final Throwable failure) {
231                     LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
232                     sender.tell(new Status.Failure(failure), getSelf());
233                 }
234             }, MoreExecutors.directExecutor());
235         }
236
237         abstract @Nullable CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
238
239         abstract @NonNull ListenableFuture<S> process(M command);
240
241         abstract ListenableFuture<?> abort();
242
243         @Override
244         public String toString() {
245             return getClass().getSimpleName();
246         }
247     }
248
249     private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
250         Idle() {
251             super(CanCommit.class);
252         }
253
254         @Override
255         ListenableFuture<PostCanCommitStep> process(final CanCommit message) {
256             return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
257         }
258
259         @Override
260         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
261             return new PostCanCommit(txId, nextStep);
262         }
263
264         @Override
265         ListenableFuture<?> abort() {
266             return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
267         }
268     }
269
270     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
271             N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
272         private final S step;
273         private final TransactionIdentifier txId;
274
275         CohortStateWithStep(final Class<M> handledMessageType, final TransactionIdentifier txId, final S step) {
276             super(handledMessageType);
277             this.txId = Objects.requireNonNull(txId);
278             this.step = Objects.requireNonNull(step);
279         }
280
281         final S getStep() {
282             return step;
283         }
284
285         @Override
286         ListenableFuture<?> abort() {
287             return getStep().abort();
288         }
289
290         @Override
291         public String toString() {
292             return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
293         }
294     }
295
296     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
297
298         PostCanCommit(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
299             super(PreCommit.class, txId, nextStep);
300         }
301
302         @SuppressWarnings("unchecked")
303         @Override
304         ListenableFuture<PostPreCommitStep> process(final PreCommit message) {
305             return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
306         }
307
308         @Override
309         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostPreCommitStep nextStep) {
310             return new PostPreCommit(txId, nextStep);
311         }
312
313     }
314
315     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
316
317         PostPreCommit(final TransactionIdentifier txId, final PostPreCommitStep step) {
318             super(Commit.class, txId, step);
319         }
320
321         @SuppressWarnings("unchecked")
322         @Override
323         ListenableFuture<NoopThreePhaseCommitStep> process(final Commit message) {
324             return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
325         }
326
327         @Override
328         CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final NoopThreePhaseCommitStep nextStep) {
329             return null;
330         }
331     }
332
333     private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
334     }
335
336     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
337         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
338     }
339 }