f24d0ee8a801efaa38ee93d692e10cd0bbcca6fa
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.primitives.UnsignedLong;
14 import com.google.common.util.concurrent.FutureCallback;
15 import java.util.Collection;
16 import java.util.Optional;
17 import javax.annotation.concurrent.NotThreadSafe;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
23 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
25 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
26 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
29 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
38 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
41 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.RequestException;
43 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
44 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
45 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Frontend read-write transaction state as observed by the shard leader.
54  *
55  * @author Robert Varga
56  */
57 @NotThreadSafe
58 final class FrontendReadWriteTransaction extends FrontendTransaction {
59     private enum CommitStage {
60         READY,
61         CAN_COMMIT_PENDING,
62         CAN_COMMIT_COMPLETE,
63         PRE_COMMIT_PENDING,
64         PRE_COMMIT_COMPLETE,
65         COMMIT_PENDING,
66     }
67
68     private abstract static class State {
69         @Override
70         public abstract String toString();
71     }
72
73     private static final class Failed extends State {
74         final RequestException cause;
75
76         Failed(final RequestException cause) {
77             this.cause = requireNonNull(cause);
78         }
79
80         @Override
81         public String toString() {
82             return "FAILED (" + cause.getMessage() + ")";
83         }
84     }
85
86     private static final class Open extends State {
87         final ReadWriteShardDataTreeTransaction openTransaction;
88
89         Open(final ReadWriteShardDataTreeTransaction openTransaction) {
90             this.openTransaction = requireNonNull(openTransaction);
91         }
92
93         @Override
94         public String toString() {
95             return "OPEN";
96         }
97     }
98
99     private static final class Ready extends State {
100         final ShardDataTreeCohort readyCohort;
101         CommitStage stage;
102
103         Ready(final ShardDataTreeCohort readyCohort) {
104             this.readyCohort = requireNonNull(readyCohort);
105             this.stage = CommitStage.READY;
106         }
107
108         @Override
109         public String toString() {
110             return "READY (" + stage + ")";
111         }
112     }
113
114     private static final class Sealed extends State {
115         final DataTreeModification sealedModification;
116
117         Sealed(final DataTreeModification sealedModification) {
118             this.sealedModification = requireNonNull(sealedModification);
119         }
120
121         @Override
122         public String toString() {
123             return "SEALED";
124         }
125     }
126
127     /**
128      * Retired state, needed to catch and suppress callbacks after we have removed associated state.
129      */
130     private static final class Retired extends State {
131         private final String prevStateString;
132
133         Retired(final State prevState) {
134             prevStateString = prevState.toString();
135         }
136
137         @Override
138         public String toString() {
139             return "RETIRED (in " + prevStateString + ")";
140         }
141     }
142
143     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
144     private static final State ABORTED = new State() {
145         @Override
146         public String toString() {
147             return "ABORTED";
148         }
149     };
150     private static final State ABORTING = new State() {
151         @Override
152         public String toString() {
153             return "ABORTING";
154         }
155     };
156     private static final State COMMITTED = new State() {
157         @Override
158         public String toString() {
159             return "COMMITTED";
160         }
161     };
162
163     private State state;
164
165     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
166             final ReadWriteShardDataTreeTransaction transaction) {
167         super(history, id);
168         this.state = new Open(transaction);
169     }
170
171     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
172             final DataTreeModification mod) {
173         super(history, id);
174         this.state = new Sealed(mod);
175     }
176
177     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
178             final ReadWriteShardDataTreeTransaction transaction) {
179         return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
180     }
181
182     static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
183             final TransactionIdentifier id, final DataTreeModification mod) {
184         return new FrontendReadWriteTransaction(history, id, mod);
185     }
186
187     // Sequence has already been checked
188     @Override
189     TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
190             final long now) throws RequestException {
191         if (request instanceof ModifyTransactionRequest) {
192             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
193         } else if (request instanceof CommitLocalTransactionRequest) {
194             handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
195             return null;
196         } else if (request instanceof ExistsTransactionRequest) {
197             return handleExistsTransaction((ExistsTransactionRequest) request);
198         } else if (request instanceof ReadTransactionRequest) {
199             return handleReadTransaction((ReadTransactionRequest) request);
200         } else if (request instanceof TransactionPreCommitRequest) {
201             handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
202             return null;
203         } else if (request instanceof TransactionDoCommitRequest) {
204             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
205             return null;
206         } else if (request instanceof TransactionAbortRequest) {
207             return handleTransactionAbort(request.getSequence(), envelope, now);
208         } else if (request instanceof AbortLocalTransactionRequest) {
209             handleLocalTransactionAbort(request.getSequence(), envelope, now);
210             return null;
211         } else {
212             LOG.warn("Rejecting unsupported request {}", request);
213             throw new UnsupportedRequestException(request);
214         }
215     }
216
217     @Override
218     void retire() {
219         state = new Retired(state);
220     }
221
222     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
223             final RequestEnvelope envelope, final long now) throws RequestException {
224         throwIfFailed();
225
226         final Ready ready = checkReady();
227         switch (ready.stage) {
228             case PRE_COMMIT_PENDING:
229                 LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
230                 break;
231             case CAN_COMMIT_COMPLETE:
232                 ready.stage = CommitStage.PRE_COMMIT_PENDING;
233                 LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
234                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
235                     @Override
236                     public void onSuccess(final DataTreeCandidate result) {
237                         successfulPreCommit(envelope, now);
238                     }
239
240                     @Override
241                     public void onFailure(final Throwable failure) {
242                         failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
243                     }
244                 });
245                 break;
246             case CAN_COMMIT_PENDING:
247             case COMMIT_PENDING:
248             case PRE_COMMIT_COMPLETE:
249             case READY:
250                 throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
251             default:
252                 throwUnhandledCommitStage(ready);
253         }
254     }
255
256     void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
257         if (state instanceof Retired) {
258             LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
259                 getIdentifier());
260             return;
261         }
262
263         final Ready ready = checkReady();
264         LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
265         recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
266             envelope.getMessage().getSequence()));
267         ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
268     }
269
270     void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
271         if (state instanceof Retired) {
272             LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
273             return;
274         }
275
276         recordAndSendFailure(envelope, now, cause);
277         state = new Failed(cause);
278         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
279     }
280
281     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
282             final long now) throws RequestException {
283         throwIfFailed();
284
285         final Ready ready = checkReady();
286         switch (ready.stage) {
287             case COMMIT_PENDING:
288                 LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
289                 break;
290             case PRE_COMMIT_COMPLETE:
291                 ready.stage = CommitStage.COMMIT_PENDING;
292                 LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
293                 ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
294                     @Override
295                     public void onSuccess(final UnsignedLong result) {
296                         successfulCommit(envelope, now);
297                     }
298
299                     @Override
300                     public void onFailure(final Throwable failure) {
301                         failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
302                     }
303                 });
304                 break;
305             case CAN_COMMIT_COMPLETE:
306             case CAN_COMMIT_PENDING:
307             case PRE_COMMIT_PENDING:
308             case READY:
309                 throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
310             default:
311                 throwUnhandledCommitStage(ready);
312         }
313     }
314
315     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
316         checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
317             sequence)));
318     }
319
320     private void startAbort() {
321         state = ABORTING;
322         LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
323     }
324
325     private void finishAbort() {
326         state = ABORTED;
327         LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
328     }
329
330     private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
331             final long now) {
332         if (state instanceof Open) {
333             final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
334             startAbort();
335             openTransaction.abort(() -> {
336                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
337                     sequence));
338                 finishAbort();
339             });
340             return null;
341         }
342         if (ABORTING.equals(state)) {
343             LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
344             return null;
345         }
346         if (ABORTED.equals(state)) {
347             // We should have recorded the reply
348             LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
349             return new TransactionAbortSuccess(getIdentifier(), sequence);
350         }
351
352         final Ready ready = checkReady();
353         startAbort();
354         ready.readyCohort.abort(new FutureCallback<Void>() {
355             @Override
356             public void onSuccess(final Void result) {
357                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
358                 finishAbort();
359             }
360
361             @Override
362             public void onFailure(final Throwable failure) {
363                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
364                 LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
365                 finishAbort();
366             }
367         });
368         return null;
369     }
370
371     private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
372         throwIfFailed();
373
374         final Ready ready = checkReady();
375         switch (ready.stage) {
376             case CAN_COMMIT_PENDING:
377                 LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
378                 break;
379             case READY:
380                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
381                 LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
382                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
383                     @Override
384                     public void onSuccess(final Void result) {
385                         successfulCanCommit(envelope, now);
386                     }
387
388                     @Override
389                     public void onFailure(final Throwable failure) {
390                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
391                     }
392                 });
393                 break;
394             case CAN_COMMIT_COMPLETE:
395             case COMMIT_PENDING:
396             case PRE_COMMIT_COMPLETE:
397             case PRE_COMMIT_PENDING:
398                 throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
399             default:
400                 throwUnhandledCommitStage(ready);
401         }
402     }
403
404     void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
405         if (state instanceof Retired) {
406             LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
407                 getIdentifier());
408             return;
409         }
410
411         final Ready ready = checkReady();
412         recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
413             envelope.getMessage().getSequence()));
414         ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
415         LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
416     }
417
418     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
419         throwIfFailed();
420
421         final Ready ready = checkReady();
422         switch (ready.stage) {
423             case CAN_COMMIT_COMPLETE:
424             case CAN_COMMIT_PENDING:
425             case COMMIT_PENDING:
426             case PRE_COMMIT_COMPLETE:
427             case PRE_COMMIT_PENDING:
428                 LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
429                     getIdentifier(), state, envelope);
430                 break;
431             case READY:
432                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
433                 LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
434                 ready.readyCohort.canCommit(new FutureCallback<Void>() {
435                     @Override
436                     public void onSuccess(final Void result) {
437                         successfulDirectCanCommit(envelope, now);
438                     }
439
440                     @Override
441                     public void onFailure(final Throwable failure) {
442                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
443                     }
444                 });
445                 break;
446             default:
447                 throwUnhandledCommitStage(ready);
448         }
449     }
450
451     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
452         if (state instanceof Retired) {
453             LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
454             return;
455         }
456
457         final Ready ready = checkReady();
458         ready.stage = CommitStage.PRE_COMMIT_PENDING;
459         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
460         ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
461             @Override
462             public void onSuccess(final DataTreeCandidate result) {
463                 successfulDirectPreCommit(envelope, startTime);
464             }
465
466             @Override
467             public void onFailure(final Throwable failure) {
468                 failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
469             }
470         });
471     }
472
473     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
474         if (state instanceof Retired) {
475             LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
476             return;
477         }
478
479         final Ready ready = checkReady();
480         ready.stage = CommitStage.COMMIT_PENDING;
481         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
482         ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
483             @Override
484             public void onSuccess(final UnsignedLong result) {
485                 successfulCommit(envelope, startTime);
486             }
487
488             @Override
489             public void onFailure(final Throwable failure) {
490                 failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
491             }
492         });
493     }
494
495     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
496         if (state instanceof Retired) {
497             LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
498             return;
499         }
500
501         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
502             envelope.getMessage().getSequence()));
503         state = COMMITTED;
504     }
505
506     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
507             final RequestEnvelope envelope, final long now) throws RequestException {
508         final DataTreeModification sealedModification = checkSealed();
509         if (!sealedModification.equals(request.getModification())) {
510             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
511             throw new UnsupportedRequestException(request);
512         }
513
514         final Optional<Exception> optFailure = request.getDelayedFailure();
515         if (optFailure.isPresent()) {
516             state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
517         } else {
518             state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, Optional.empty()));
519         }
520
521         if (request.isCoordinated()) {
522             coordinatedCommit(envelope, now);
523         } else {
524             directCommit(envelope, now);
525         }
526     }
527
528     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) {
529         final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
530         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
531             data.isPresent()));
532     }
533
534     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) {
535         final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
536         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
537             data));
538     }
539
540     private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
541         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
542     }
543
544     private void applyModifications(final Collection<TransactionModification> modifications) {
545         if (!modifications.isEmpty()) {
546             final DataTreeModification modification = checkOpen().getSnapshot();
547             for (TransactionModification m : modifications) {
548                 if (m instanceof TransactionDelete) {
549                     modification.delete(m.getPath());
550                 } else if (m instanceof TransactionWrite) {
551                     modification.write(m.getPath(), ((TransactionWrite) m).getData());
552                 } else if (m instanceof TransactionMerge) {
553                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
554                 } else {
555                     LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
556                 }
557             }
558         }
559     }
560
561     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
562             final RequestEnvelope envelope, final long now) throws RequestException {
563         // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
564         // protocol, there is nothing for us to do.
565         final Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
566         if (!maybeProto.isPresent()) {
567             applyModifications(request.getModifications());
568             return replyModifySuccess(request.getSequence());
569         }
570
571         switch (maybeProto.get()) {
572             case ABORT:
573                 if (ABORTING.equals(state)) {
574                     LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
575                     return null;
576                 }
577                 final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
578                 startAbort();
579                 openTransaction.abort(() -> {
580                     recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
581                         request.getSequence()));
582                     finishAbort();
583                 });
584                 return null;
585             case READY:
586                 ensureReady(request.getModifications());
587                 return replyModifySuccess(request.getSequence());
588             case SIMPLE:
589                 ensureReady(request.getModifications());
590                 directCommit(envelope, now);
591                 return null;
592             case THREE_PHASE:
593                 ensureReady(request.getModifications());
594                 coordinatedCommit(envelope, now);
595                 return null;
596             default:
597                 LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
598                 throw new UnsupportedRequestException(request);
599         }
600     }
601
602     private void ensureReady(final Collection<TransactionModification> modifications) {
603         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
604         // only once.
605         if (state instanceof Ready) {
606             LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
607             return;
608         }
609
610         applyModifications(modifications);
611         state = new Ready(checkOpen().ready(Optional.empty()));
612         LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
613     }
614
615     private void throwIfFailed() throws RequestException {
616         if (state instanceof Failed) {
617             LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
618             throw ((Failed) state).cause;
619         }
620     }
621
622     private ReadWriteShardDataTreeTransaction checkOpen() {
623         checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), state);
624         return ((Open) state).openTransaction;
625     }
626
627     private Ready checkReady() {
628         checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), state);
629         return (Ready) state;
630     }
631
632     private DataTreeModification checkSealed() {
633         checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), state);
634         return ((Sealed) state).sealedModification;
635     }
636
637     private static void throwUnhandledCommitStage(final Ready ready) {
638         throw new IllegalStateException("Unhandled commit stage " + ready.stage);
639     }
640 }