BUG-5280: switch transactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
16 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
17 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
18 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
19 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
21 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
27  * decapsulating DataTreeChanged messages and dispatching their context to the user.
28  */
29 final class DataTreeCohortActor extends AbstractUntypedActor {
30     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
31     private final CohortBehaviour<?> idleState = new Idle();
32     private final DOMDataTreeCommitCohort cohort;
33     private CohortBehaviour<?> currentState = idleState;
34
35     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
36         this.cohort = Preconditions.checkNotNull(cohort);
37     }
38
39     @Override
40     protected void handleReceive(final Object message) {
41         currentState = currentState.handle(message);
42     }
43
44
45     /**
46      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
47      *
48      * @param <R> Reply message type
49      */
50     static abstract class CommitProtocolCommand<R extends CommitReply> {
51
52         private final String txId;
53
54         final String getTxId() {
55             return txId;
56         }
57
58         protected CommitProtocolCommand(String txId) {
59             this.txId = Preconditions.checkNotNull(txId);
60         }
61     }
62
63     static final class CanCommit extends CommitProtocolCommand<Success> {
64
65         private final DOMDataTreeCandidate candidate;
66         private final ActorRef cohort;
67         private final SchemaContext schema;
68
69         CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
70             super(txId);
71             this.cohort = Preconditions.checkNotNull(cohort);
72             this.candidate = Preconditions.checkNotNull(candidate);
73             this.schema = Preconditions.checkNotNull(schema);
74         }
75
76         DOMDataTreeCandidate getCandidate() {
77             return candidate;
78         }
79
80         SchemaContext getSchema() {
81             return schema;
82         }
83
84         ActorRef getCohort() {
85             return cohort;
86         }
87
88     }
89
90     static abstract class CommitReply {
91
92         private final ActorRef cohortRef;
93         private final String txId;
94
95         protected CommitReply(ActorRef cohortRef, String txId) {
96             this.cohortRef = Preconditions.checkNotNull(cohortRef);
97             this.txId = Preconditions.checkNotNull(txId);
98         }
99
100         ActorRef getCohort() {
101             return cohortRef;
102         }
103
104         final String getTxId() {
105             return txId;
106         }
107
108     }
109
110     static final class Success extends CommitReply {
111
112         public Success(ActorRef cohortRef, String txId) {
113             super(cohortRef, txId);
114         }
115
116     }
117
118     static final class PreCommit extends CommitProtocolCommand<Success> {
119
120         public PreCommit(String txId) {
121             super(txId);
122         }
123     }
124
125     static final class Abort extends CommitProtocolCommand<Success> {
126
127         public Abort(String txId) {
128             super(txId);
129         }
130     }
131
132     static final class Commit extends CommitProtocolCommand<Success> {
133
134         public Commit(String txId) {
135             super(txId);
136         }
137     }
138
139     private static abstract class CohortBehaviour<E> {
140
141         abstract Class<E> getHandledMessageType();
142
143         CohortBehaviour<?> handle(Object message) {
144             if (getHandledMessageType().isInstance(message)) {
145                 return process(getHandledMessageType().cast(message));
146             } else if (message instanceof Abort) {
147                 return abort();
148             }
149             throw new UnsupportedOperationException();
150         }
151
152         abstract CohortBehaviour<?> abort();
153
154         abstract CohortBehaviour<?> process(E message);
155
156     }
157
158     private class Idle extends CohortBehaviour<CanCommit> {
159
160         @Override
161         Class<CanCommit> getHandledMessageType() {
162             return CanCommit.class;
163         }
164
165         @Override
166         CohortBehaviour<?> process(CanCommit message) {
167             final PostCanCommitStep nextStep;
168             try {
169                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
170             } catch (final Exception e) {
171                 getSender().tell(new Status.Failure(e), getSelf());
172                 return this;
173             }
174             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
175             return new PostCanCommit(message.getTxId(), nextStep);
176         }
177
178         @Override
179         CohortBehaviour<?> abort() {
180             return this;
181         }
182
183     }
184
185
186     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
187             extends CohortBehaviour<M> {
188
189         private final S step;
190         private final String txId;
191
192         CohortStateWithStep(String txId, S step) {
193             this.txId = Preconditions.checkNotNull(txId);
194             this.step = Preconditions.checkNotNull(step);
195         }
196
197         final S getStep() {
198             return step;
199         }
200
201         final String getTxId() {
202             return txId;
203         }
204
205         @Override
206         final CohortBehaviour<?> abort() {
207             try {
208                 getStep().abort().get();
209             } catch (final Exception e) {
210                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
211                 getSender().tell(new Status.Failure(e), getSelf());
212                 return idleState;
213             }
214             getSender().tell(new Success(getSelf(), txId), getSelf());
215             return idleState;
216         }
217
218     }
219
220     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
221
222         PostCanCommit(String txId, PostCanCommitStep nextStep) {
223             super(txId, nextStep);
224         }
225
226         @Override
227         Class<PreCommit> getHandledMessageType() {
228             return PreCommit.class;
229         }
230
231         @Override
232         CohortBehaviour<?> process(PreCommit message) {
233             final PostPreCommitStep nextStep;
234             try {
235                 nextStep = getStep().preCommit().get();
236             } catch (final Exception e) {
237                 getSender().tell(new Status.Failure(e), getSelf());
238                 return idleState;
239             }
240             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
241             return new PostPreCommit(getTxId(), nextStep);
242         }
243
244     }
245
246     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
247
248         PostPreCommit(String txId, PostPreCommitStep step) {
249             super(txId, step);
250         }
251
252         @Override
253         CohortBehaviour<?> process(Commit message) {
254             try {
255                 getStep().commit().get();
256             } catch (final Exception e) {
257                 getSender().tell(new Status.Failure(e), getSender());
258                 return idleState;
259             }
260             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
261             return idleState;
262         }
263
264         @Override
265         Class<Commit> getHandledMessageType() {
266             return Commit.class;
267         }
268
269     }
270
271     static Props props(final DOMDataTreeCommitCohort cohort) {
272         return Props.create(DataTreeCohortActor.class, cohort);
273     }
274 }