9c39d823fe5536a222ef3a99fa2abdac7b70d677
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.primitives.UnsignedLong;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.Collection;
14 import java.util.Optional;
15 import javax.annotation.Nullable;
16 import javax.annotation.concurrent.NotThreadSafe;
17 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
18 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
21 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
23 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
24 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
25 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
26 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
31 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
33 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
34 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
36 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
38 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
39 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
40 import org.opendaylight.controller.cluster.access.concepts.RequestException;
41 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
42 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
43 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
46 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Frontend read-write transaction state as observed by the shard leader.
52  *
53  * @author Robert Varga
54  */
55 @NotThreadSafe
56 final class FrontendReadWriteTransaction extends FrontendTransaction {
57     private enum CommitStage {
58         READY,
59         CAN_COMMIT_PENDING,
60         CAN_COMMIT_COMPLETE,
61         PRE_COMMIT_PENDING,
62         PRE_COMMIT_COMPLETE,
63         COMMIT_PENDING,
64     }
65
66     private abstract static class State {
67         @Override
68         public abstract String toString();
69     }
70
71     private static final class Failed extends State {
72         final RequestException cause;
73
74         Failed(final RequestException cause) {
75             this.cause = Preconditions.checkNotNull(cause);
76         }
77
78         @Override
79         public String toString() {
80             return "FAILED (" + cause.getMessage() + ")";
81         }
82     }
83
84     private static final class Open extends State {
85         final ReadWriteShardDataTreeTransaction openTransaction;
86
87         Open(final ReadWriteShardDataTreeTransaction openTransaction) {
88             this.openTransaction = Preconditions.checkNotNull(openTransaction);
89         }
90
91         @Override
92         public String toString() {
93             return "OPEN";
94         }
95     }
96
97     private static final class Ready extends State {
98         final ShardDataTreeCohort readyCohort;
99         CommitStage stage;
100
101         Ready(final ShardDataTreeCohort readyCohort) {
102             this.readyCohort = Preconditions.checkNotNull(readyCohort);
103             this.stage = CommitStage.READY;
104         }
105
106         @Override
107         public String toString() {
108             return "READY (" + stage + ")";
109         }
110     }
111
112     private static final class Sealed extends State {
113         final DataTreeModification sealedModification;
114
115         Sealed(final DataTreeModification sealedModification) {
116             this.sealedModification = Preconditions.checkNotNull(sealedModification);
117         }
118
119         @Override
120         public String toString() {
121             return "SEALED";
122         }
123     }
124
125     /**
126      * Retired state, needed to catch and suppress callbacks after we have removed associated state.
127      */
128     private static final class Retired extends State {
129         private final String prevStateString;
130
131         Retired(final State prevState) {
132             prevStateString = prevState.toString();
133         }
134
135         @Override
136         public String toString() {
137             return "RETIRED (in " + prevStateString + ")";
138         }
139     }
140
141     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
142     private static final State ABORTED = new State() {
143         @Override
144         public String toString() {
145             return "ABORTED";
146         }
147     };
148     private static final State ABORTING = new State() {
149         @Override
150         public String toString() {
151             return "ABORTING";
152         }
153     };
154     private static final State COMMITTED = new State() {
155         @Override
156         public String toString() {
157             return "COMMITTED";
158         }
159     };
160
161     private State state;
162
163     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
164             final ReadWriteShardDataTreeTransaction transaction) {
165         super(history, id);
166         this.state = new Open(transaction);
167     }
168
169     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
170             final DataTreeModification mod) {
171         super(history, id);
172         this.state = new Sealed(mod);
173     }
174
175     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
176             final ReadWriteShardDataTreeTransaction transaction) {
177         return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
178     }
179
180     static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
181             final TransactionIdentifier id, final DataTreeModification mod) {
182         return new FrontendReadWriteTransaction(history, id, mod);
183     }
184
185     // Sequence has already been checked
186     @Override
187     @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
188             final long now) throws RequestException {
189         if (request instanceof ModifyTransactionRequest) {
190             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
191         } else if (request instanceof CommitLocalTransactionRequest) {
192             handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
193             return null;
194         } else if (request instanceof ExistsTransactionRequest) {
195             return handleExistsTransaction((ExistsTransactionRequest) request);
196         } else if (request instanceof ReadTransactionRequest) {
197             return handleReadTransaction((ReadTransactionRequest) request);
198         } else if (request instanceof TransactionPreCommitRequest) {
199             handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
200             return null;
201         } else if (request instanceof TransactionDoCommitRequest) {
202             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
203             return null;
204         } else if (request instanceof TransactionAbortRequest) {
205             return handleTransactionAbort(request.getSequence(), envelope, now);
206         } else if (request instanceof AbortLocalTransactionRequest) {
207             handleLocalTransactionAbort(request.getSequence(), envelope, now);
208             return null;
209         } else {
210             LOG.warn("Rejecting unsupported request {}", request);
211             throw new UnsupportedRequestException(request);
212         }
213     }
214
215     @Override
216     void retire() {
217         state = new Retired(state);
218     }
219
220     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
221             final RequestEnvelope envelope, final long now) throws RequestException {
222         throwIfFailed();
223
224         final Ready ready = checkReady();
225         switch (ready.stage) {
226             case PRE_COMMIT_PENDING:
227                 LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
228                 break;
229             case CAN_COMMIT_COMPLETE:
230                 ready.stage = CommitStage.PRE_COMMIT_PENDING;
231                 LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
232                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
233                     @Override
234                     public void onSuccess(final DataTreeCandidate result) {
235                         successfulPreCommit(envelope, now);
236                     }
237
238                     @Override
239                     public void onFailure(final Throwable failure) {
240                         failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
241                     }
242                 });
243                 break;
244             case CAN_COMMIT_PENDING:
245             case COMMIT_PENDING:
246             case PRE_COMMIT_COMPLETE:
247             case READY:
248                 throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
249             default:
250                 throwUnhandledCommitStage(ready);
251         }
252     }
253
254     void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
255         if (state instanceof Retired) {
256             LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
257                 getIdentifier());
258             return;
259         }
260
261         final Ready ready = checkReady();
262         LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
263         recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
264             envelope.getMessage().getSequence()));
265         ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
266     }
267
268     void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
269         if (state instanceof Retired) {
270             LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
271             return;
272         }
273
274         recordAndSendFailure(envelope, now, cause);
275         state = new Failed(cause);
276         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
277     }
278
279     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
280             final long now) throws RequestException {
281         throwIfFailed();
282
283         final Ready ready = checkReady();
284         switch (ready.stage) {
285             case COMMIT_PENDING:
286                 LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
287                 break;
288             case PRE_COMMIT_COMPLETE:
289                 ready.stage = CommitStage.COMMIT_PENDING;
290                 LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
291                 ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
292                     @Override
293                     public void onSuccess(final UnsignedLong result) {
294                         successfulCommit(envelope, now);
295                     }
296
297                     @Override
298                     public void onFailure(final Throwable failure) {
299                         failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
300                     }
301                 });
302                 break;
303             case CAN_COMMIT_COMPLETE:
304             case CAN_COMMIT_PENDING:
305             case PRE_COMMIT_PENDING:
306             case READY:
307                 throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
308             default:
309                 throwUnhandledCommitStage(ready);
310         }
311     }
312
313     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
314         checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
315             sequence)));
316     }
317
318     private void startAbort() {
319         state = ABORTING;
320         LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
321     }
322
323     private void finishAbort() {
324         state = ABORTED;
325         LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
326     }
327
328     private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
329             final long now) {
330         if (state instanceof Open) {
331             final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
332             startAbort();
333             openTransaction.abort(() -> {
334                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
335                     sequence));
336                 finishAbort();
337             });
338             return null;
339         }
340         if (ABORTING.equals(state)) {
341             LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
342             return null;
343         }
344         if (ABORTED.equals(state)) {
345             // We should have recorded the reply
346             LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
347             return new TransactionAbortSuccess(getIdentifier(), sequence);
348         }
349
350         final Ready ready = checkReady();
351         startAbort();
352         ready.readyCohort.abort(new FutureCallback<Void>() {
353             @Override
354             public void onSuccess(final Void result) {
355                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
356                 finishAbort();
357             }
358
359             @Override
360             public void onFailure(final Throwable failure) {
361                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
362                 LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
363                 finishAbort();
364             }
365         });
366         return null;
367     }
368
369     private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
370         throwIfFailed();
371
372         final Ready ready = checkReady();
373         switch (ready.stage) {
374             case CAN_COMMIT_PENDING:
375                 LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
376                 break;
377             case READY:
378                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
379                 LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
380                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
381                     @Override
382                     public void onSuccess(final Void result) {
383                         successfulCanCommit(envelope, now);
384                     }
385
386                     @Override
387                     public void onFailure(final Throwable failure) {
388                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
389                     }
390                 });
391                 break;
392             case CAN_COMMIT_COMPLETE:
393             case COMMIT_PENDING:
394             case PRE_COMMIT_COMPLETE:
395             case PRE_COMMIT_PENDING:
396                 throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
397             default:
398                 throwUnhandledCommitStage(ready);
399         }
400     }
401
402     void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
403         if (state instanceof Retired) {
404             LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
405                 getIdentifier());
406             return;
407         }
408
409         final Ready ready = checkReady();
410         recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
411             envelope.getMessage().getSequence()));
412         ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
413         LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
414     }
415
416     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
417         throwIfFailed();
418
419         final Ready ready = checkReady();
420         switch (ready.stage) {
421             case CAN_COMMIT_COMPLETE:
422             case CAN_COMMIT_PENDING:
423             case COMMIT_PENDING:
424             case PRE_COMMIT_COMPLETE:
425             case PRE_COMMIT_PENDING:
426                 LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
427                     getIdentifier(), state, envelope);
428                 break;
429             case READY:
430                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
431                 LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
432                 ready.readyCohort.canCommit(new FutureCallback<Void>() {
433                     @Override
434                     public void onSuccess(final Void result) {
435                         successfulDirectCanCommit(envelope, now);
436                     }
437
438                     @Override
439                     public void onFailure(final Throwable failure) {
440                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
441                     }
442                 });
443                 break;
444             default:
445                 throwUnhandledCommitStage(ready);
446         }
447     }
448
449     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
450         if (state instanceof Retired) {
451             LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
452             return;
453         }
454
455         final Ready ready = checkReady();
456         ready.stage = CommitStage.PRE_COMMIT_PENDING;
457         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
458         ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
459             @Override
460             public void onSuccess(final DataTreeCandidate result) {
461                 successfulDirectPreCommit(envelope, startTime);
462             }
463
464             @Override
465             public void onFailure(final Throwable failure) {
466                 failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
467             }
468         });
469     }
470
471     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
472         if (state instanceof Retired) {
473             LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
474             return;
475         }
476
477         final Ready ready = checkReady();
478         ready.stage = CommitStage.COMMIT_PENDING;
479         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
480         ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
481             @Override
482             public void onSuccess(final UnsignedLong result) {
483                 successfulCommit(envelope, startTime);
484             }
485
486             @Override
487             public void onFailure(final Throwable failure) {
488                 failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
489             }
490         });
491     }
492
493     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
494         if (state instanceof Retired) {
495             LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
496             return;
497         }
498
499         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
500             envelope.getMessage().getSequence()));
501         state = COMMITTED;
502     }
503
504     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
505             final RequestEnvelope envelope, final long now) throws RequestException {
506         final DataTreeModification sealedModification = checkSealed();
507         if (!sealedModification.equals(request.getModification())) {
508             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
509             throw new UnsupportedRequestException(request);
510         }
511
512         final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
513         if (optFailure.isPresent()) {
514             state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
515         } else {
516             state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification,
517                     java.util.Optional.empty()));
518         }
519
520         if (request.isCoordinated()) {
521             coordinatedCommit(envelope, now);
522         } else {
523             directCommit(envelope, now);
524         }
525     }
526
527     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
528             throws RequestException {
529         final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
530         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
531             data.isPresent()));
532     }
533
534     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
535             throws RequestException {
536         final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
537         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
538             data));
539     }
540
541     private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
542         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
543     }
544
545     private void applyModifications(final Collection<TransactionModification> modifications) {
546         if (!modifications.isEmpty()) {
547             final DataTreeModification modification = checkOpen().getSnapshot();
548             for (TransactionModification m : modifications) {
549                 if (m instanceof TransactionDelete) {
550                     modification.delete(m.getPath());
551                 } else if (m instanceof TransactionWrite) {
552                     modification.write(m.getPath(), ((TransactionWrite) m).getData());
553                 } else if (m instanceof TransactionMerge) {
554                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
555                 } else {
556                     LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
557                 }
558             }
559         }
560     }
561
562     @Nullable
563     private TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
564             final RequestEnvelope envelope, final long now) throws RequestException {
565         // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
566         // protocol, there is nothing for us to do.
567         final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
568         if (!maybeProto.isPresent()) {
569             applyModifications(request.getModifications());
570             return replyModifySuccess(request.getSequence());
571         }
572
573         switch (maybeProto.get()) {
574             case ABORT:
575                 if (ABORTING.equals(state)) {
576                     LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
577                     return null;
578                 }
579                 final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
580                 startAbort();
581                 openTransaction.abort(() -> {
582                     recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
583                         request.getSequence()));
584                     finishAbort();
585                 });
586                 return null;
587             case READY:
588                 ensureReady(request.getModifications());
589                 return replyModifySuccess(request.getSequence());
590             case SIMPLE:
591                 ensureReady(request.getModifications());
592                 directCommit(envelope, now);
593                 return null;
594             case THREE_PHASE:
595                 ensureReady(request.getModifications());
596                 coordinatedCommit(envelope, now);
597                 return null;
598             default:
599                 LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
600                 throw new UnsupportedRequestException(request);
601         }
602     }
603
604     private void ensureReady(final Collection<TransactionModification> modifications) {
605         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
606         // only once.
607         if (state instanceof Ready) {
608             LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
609             return;
610         }
611
612         applyModifications(modifications);
613         state = new Ready(checkOpen().ready(java.util.Optional.empty()));
614         LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
615     }
616
617     private void throwIfFailed() throws RequestException {
618         if (state instanceof Failed) {
619             LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
620             throw ((Failed) state).cause;
621         }
622     }
623
624     private ReadWriteShardDataTreeTransaction checkOpen() {
625         Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(),
626             state);
627         return ((Open) state).openTransaction;
628     }
629
630     private Ready checkReady() {
631         Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(),
632             state);
633         return (Ready) state;
634     }
635
636     private DataTreeModification checkSealed() {
637         Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(),
638             state);
639         return ((Sealed) state).sealedModification;
640     }
641
642     private static void throwUnhandledCommitStage(final Ready ready) {
643         throw new IllegalStateException("Unhandled commit stage " + ready.stage);
644     }
645 }