Add OnDemandShardState to report additional Shard state
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
29  * decapsulating DataTreeChanged messages and dispatching their context to the user.
30  */
31 final class DataTreeCohortActor extends AbstractUntypedActor {
32     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
33     private final CohortBehaviour<?> idleState = new Idle();
34     private final DOMDataTreeCommitCohort cohort;
35     private final YangInstanceIdentifier registeredPath;
36     private CohortBehaviour<?> currentState = idleState;
37
38     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
39         this.cohort = Preconditions.checkNotNull(cohort);
40         this.registeredPath = Preconditions.checkNotNull(registeredPath);
41     }
42
43     @Override
44     protected void handleReceive(final Object message) {
45         currentState = currentState.handle(message);
46     }
47
48
49     /**
50      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
51      *
52      * @param <R> Reply message type
53      */
54     static abstract class CommitProtocolCommand<R extends CommitReply> {
55
56         private final TransactionIdentifier txId;
57
58         final TransactionIdentifier getTxId() {
59             return txId;
60         }
61
62         protected CommitProtocolCommand(TransactionIdentifier txId) {
63             this.txId = Preconditions.checkNotNull(txId);
64         }
65     }
66
67     static final class CanCommit extends CommitProtocolCommand<Success> {
68
69         private final DOMDataTreeCandidate candidate;
70         private final ActorRef cohort;
71         private final SchemaContext schema;
72
73         CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
74             super(txId);
75             this.cohort = Preconditions.checkNotNull(cohort);
76             this.candidate = Preconditions.checkNotNull(candidate);
77             this.schema = Preconditions.checkNotNull(schema);
78         }
79
80         DOMDataTreeCandidate getCandidate() {
81             return candidate;
82         }
83
84         SchemaContext getSchema() {
85             return schema;
86         }
87
88         ActorRef getCohort() {
89             return cohort;
90         }
91
92     }
93
94     static abstract class CommitReply {
95
96         private final ActorRef cohortRef;
97         private final TransactionIdentifier txId;
98
99         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
100             this.cohortRef = Preconditions.checkNotNull(cohortRef);
101             this.txId = Preconditions.checkNotNull(txId);
102         }
103
104         ActorRef getCohort() {
105             return cohortRef;
106         }
107
108         final TransactionIdentifier getTxId() {
109             return txId;
110         }
111     }
112
113     static final class Success extends CommitReply {
114
115         public Success(ActorRef cohortRef, TransactionIdentifier txId) {
116             super(cohortRef, txId);
117         }
118
119     }
120
121     static final class PreCommit extends CommitProtocolCommand<Success> {
122
123         public PreCommit(TransactionIdentifier txId) {
124             super(txId);
125         }
126     }
127
128     static final class Abort extends CommitProtocolCommand<Success> {
129
130         public Abort(TransactionIdentifier txId) {
131             super(txId);
132         }
133     }
134
135     static final class Commit extends CommitProtocolCommand<Success> {
136
137         public Commit(TransactionIdentifier txId) {
138             super(txId);
139         }
140     }
141
142     private static abstract class CohortBehaviour<E> {
143
144         abstract Class<E> getHandledMessageType();
145
146         CohortBehaviour<?> handle(Object message) {
147             if (getHandledMessageType().isInstance(message)) {
148                 return process(getHandledMessageType().cast(message));
149             } else if (message instanceof Abort) {
150                 return abort();
151             }
152             throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
153                     message.getClass(), getClass().getSimpleName()));
154         }
155
156         abstract CohortBehaviour<?> abort();
157
158         abstract CohortBehaviour<?> process(E message);
159
160     }
161
162     private class Idle extends CohortBehaviour<CanCommit> {
163
164         @Override
165         Class<CanCommit> getHandledMessageType() {
166             return CanCommit.class;
167         }
168
169         @Override
170         CohortBehaviour<?> process(CanCommit message) {
171             final PostCanCommitStep nextStep;
172             try {
173                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
174             } catch (final Exception e) {
175                 getSender().tell(new Status.Failure(e), getSelf());
176                 return this;
177             }
178             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
179             return new PostCanCommit(message.getTxId(), nextStep);
180         }
181
182         @Override
183         CohortBehaviour<?> abort() {
184             return this;
185         }
186
187     }
188
189
190     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
191             extends CohortBehaviour<M> {
192
193         private final S step;
194         private final TransactionIdentifier txId;
195
196         CohortStateWithStep(TransactionIdentifier txId, S step) {
197             this.txId = Preconditions.checkNotNull(txId);
198             this.step = Preconditions.checkNotNull(step);
199         }
200
201         final S getStep() {
202             return step;
203         }
204
205         final TransactionIdentifier getTxId() {
206             return txId;
207         }
208
209         @Override
210         final CohortBehaviour<?> abort() {
211             try {
212                 getStep().abort().get();
213             } catch (final Exception e) {
214                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
215                 getSender().tell(new Status.Failure(e), getSelf());
216                 return idleState;
217             }
218             getSender().tell(new Success(getSelf(), txId), getSelf());
219             return idleState;
220         }
221
222     }
223
224     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
225
226         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
227             super(txId, nextStep);
228         }
229
230         @Override
231         Class<PreCommit> getHandledMessageType() {
232             return PreCommit.class;
233         }
234
235         @Override
236         CohortBehaviour<?> process(PreCommit message) {
237             final PostPreCommitStep nextStep;
238             try {
239                 nextStep = getStep().preCommit().get();
240             } catch (final Exception e) {
241                 getSender().tell(new Status.Failure(e), getSelf());
242                 return idleState;
243             }
244             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
245             return new PostPreCommit(getTxId(), nextStep);
246         }
247
248     }
249
250     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
251
252         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
253             super(txId, step);
254         }
255
256         @Override
257         CohortBehaviour<?> process(Commit message) {
258             try {
259                 getStep().commit().get();
260             } catch (final Exception e) {
261                 getSender().tell(new Status.Failure(e), getSender());
262                 return idleState;
263             }
264             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
265             return idleState;
266         }
267
268         @Override
269         Class<Commit> getHandledMessageType() {
270             return Commit.class;
271         }
272
273     }
274
275     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
276         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
277     }
278 }