Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.primitives.UnsignedLong;
14 import com.google.common.util.concurrent.FutureCallback;
15 import java.util.Collection;
16 import java.util.Optional;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
22 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
24 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
25 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
26 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
27 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
29 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
34 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
35 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
37 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
39 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
40 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
41 import org.opendaylight.controller.cluster.access.concepts.RequestException;
42 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
43 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
44 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Frontend read-write transaction state as observed by the shard leader. This class is NOT thread-safe.
53  *
54  * @author Robert Varga
55  */
56 final class FrontendReadWriteTransaction extends FrontendTransaction {
57     private enum CommitStage {
58         READY,
59         CAN_COMMIT_PENDING,
60         CAN_COMMIT_COMPLETE,
61         PRE_COMMIT_PENDING,
62         PRE_COMMIT_COMPLETE,
63         COMMIT_PENDING,
64     }
65
66     private abstract static class State {
67         @Override
68         public abstract String toString();
69     }
70
71     private static final class Failed extends State {
72         final RequestException cause;
73
74         Failed(final RequestException cause) {
75             this.cause = requireNonNull(cause);
76         }
77
78         @Override
79         public String toString() {
80             return "FAILED (" + cause.getMessage() + ")";
81         }
82     }
83
84     private static final class Open extends State {
85         final ReadWriteShardDataTreeTransaction openTransaction;
86
87         Open(final ReadWriteShardDataTreeTransaction openTransaction) {
88             this.openTransaction = requireNonNull(openTransaction);
89         }
90
91         @Override
92         public String toString() {
93             return "OPEN";
94         }
95     }
96
97     private static final class Ready extends State {
98         final ShardDataTreeCohort readyCohort;
99         CommitStage stage;
100
101         Ready(final ShardDataTreeCohort readyCohort) {
102             this.readyCohort = requireNonNull(readyCohort);
103             this.stage = CommitStage.READY;
104         }
105
106         @Override
107         public String toString() {
108             return "READY (" + stage + ")";
109         }
110     }
111
112     private static final class Sealed extends State {
113         final DataTreeModification sealedModification;
114
115         Sealed(final DataTreeModification sealedModification) {
116             this.sealedModification = requireNonNull(sealedModification);
117         }
118
119         @Override
120         public String toString() {
121             return "SEALED";
122         }
123     }
124
125     /**
126      * Retired state, needed to catch and suppress callbacks after we have removed associated state.
127      */
128     private static final class Retired extends State {
129         private final String prevStateString;
130
131         Retired(final State prevState) {
132             prevStateString = prevState.toString();
133         }
134
135         @Override
136         public String toString() {
137             return "RETIRED (in " + prevStateString + ")";
138         }
139     }
140
141     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
142     private static final State ABORTED = new State() {
143         @Override
144         public String toString() {
145             return "ABORTED";
146         }
147     };
148     private static final State ABORTING = new State() {
149         @Override
150         public String toString() {
151             return "ABORTING";
152         }
153     };
154     private static final State COMMITTED = new State() {
155         @Override
156         public String toString() {
157             return "COMMITTED";
158         }
159     };
160
161     private State state;
162
163     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
164             final ReadWriteShardDataTreeTransaction transaction) {
165         super(history, id);
166         this.state = new Open(transaction);
167     }
168
169     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
170             final DataTreeModification mod) {
171         super(history, id);
172         this.state = new Sealed(mod);
173     }
174
175     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
176             final ReadWriteShardDataTreeTransaction transaction) {
177         return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
178     }
179
180     static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
181             final TransactionIdentifier id, final DataTreeModification mod) {
182         return new FrontendReadWriteTransaction(history, id, mod);
183     }
184
185     // Sequence has already been checked
186     @Override
187     TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
188             final long now) throws RequestException {
189         if (request instanceof ModifyTransactionRequest) {
190             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
191         } else if (request instanceof CommitLocalTransactionRequest) {
192             handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
193             return null;
194         } else if (request instanceof ExistsTransactionRequest) {
195             return handleExistsTransaction((ExistsTransactionRequest) request);
196         } else if (request instanceof ReadTransactionRequest) {
197             return handleReadTransaction((ReadTransactionRequest) request);
198         } else if (request instanceof TransactionPreCommitRequest) {
199             handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
200             return null;
201         } else if (request instanceof TransactionDoCommitRequest) {
202             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
203             return null;
204         } else if (request instanceof TransactionAbortRequest) {
205             return handleTransactionAbort(request.getSequence(), envelope, now);
206         } else if (request instanceof AbortLocalTransactionRequest) {
207             handleLocalTransactionAbort(request.getSequence(), envelope, now);
208             return null;
209         } else {
210             LOG.warn("Rejecting unsupported request {}", request);
211             throw new UnsupportedRequestException(request);
212         }
213     }
214
215     @Override
216     void retire() {
217         state = new Retired(state);
218     }
219
220     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
221             final RequestEnvelope envelope, final long now) throws RequestException {
222         throwIfFailed();
223
224         final Ready ready = checkReady();
225         switch (ready.stage) {
226             case PRE_COMMIT_PENDING:
227                 LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
228                 break;
229             case CAN_COMMIT_COMPLETE:
230                 ready.stage = CommitStage.PRE_COMMIT_PENDING;
231                 LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
232                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
233                     @Override
234                     public void onSuccess(final DataTreeCandidate result) {
235                         successfulPreCommit(envelope, now);
236                     }
237
238                     @Override
239                     public void onFailure(final Throwable failure) {
240                         failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
241                     }
242                 });
243                 break;
244             case CAN_COMMIT_PENDING:
245             case COMMIT_PENDING:
246             case PRE_COMMIT_COMPLETE:
247             case READY:
248                 throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
249             default:
250                 throwUnhandledCommitStage(ready);
251         }
252     }
253
254     void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
255         if (state instanceof Retired) {
256             LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
257                 getIdentifier());
258             return;
259         }
260
261         final Ready ready = checkReady();
262         LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
263         recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
264             envelope.getMessage().getSequence()));
265         ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
266     }
267
268     void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
269         if (state instanceof Retired) {
270             LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
271             return;
272         }
273
274         recordAndSendFailure(envelope, now, cause);
275         state = new Failed(cause);
276         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
277     }
278
279     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
280             final long now) throws RequestException {
281         throwIfFailed();
282
283         final Ready ready = checkReady();
284         switch (ready.stage) {
285             case COMMIT_PENDING:
286                 LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
287                 break;
288             case PRE_COMMIT_COMPLETE:
289                 ready.stage = CommitStage.COMMIT_PENDING;
290                 LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
291                 ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
292                     @Override
293                     public void onSuccess(final UnsignedLong result) {
294                         successfulCommit(envelope, now);
295                     }
296
297                     @Override
298                     public void onFailure(final Throwable failure) {
299                         failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
300                     }
301                 });
302                 break;
303             case CAN_COMMIT_COMPLETE:
304             case CAN_COMMIT_PENDING:
305             case PRE_COMMIT_PENDING:
306             case READY:
307                 throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
308             default:
309                 throwUnhandledCommitStage(ready);
310         }
311     }
312
313     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
314         checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
315             sequence)));
316     }
317
318     private void startAbort() {
319         state = ABORTING;
320         LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
321     }
322
323     private void finishAbort() {
324         state = ABORTED;
325         LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
326     }
327
328     private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
329             final long now) {
330         if (state instanceof Open) {
331             final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
332             startAbort();
333             openTransaction.abort(() -> {
334                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
335                     sequence));
336                 finishAbort();
337             });
338             return null;
339         }
340         if (ABORTING.equals(state)) {
341             LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
342             return null;
343         }
344         if (ABORTED.equals(state)) {
345             // We should have recorded the reply
346             LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
347             return new TransactionAbortSuccess(getIdentifier(), sequence);
348         }
349
350         final Ready ready = checkReady();
351         startAbort();
352         ready.readyCohort.abort(new FutureCallback<Void>() {
353             @Override
354             public void onSuccess(final Void result) {
355                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
356                 finishAbort();
357             }
358
359             @Override
360             public void onFailure(final Throwable failure) {
361                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
362                 LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
363                 finishAbort();
364             }
365         });
366         return null;
367     }
368
369     private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
370         throwIfFailed();
371
372         final Ready ready = checkReady();
373         switch (ready.stage) {
374             case CAN_COMMIT_PENDING:
375                 LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
376                 break;
377             case READY:
378                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
379                 LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
380                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
381                     @Override
382                     public void onSuccess(final Void result) {
383                         successfulCanCommit(envelope, now);
384                     }
385
386                     @Override
387                     public void onFailure(final Throwable failure) {
388                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
389                     }
390                 });
391                 break;
392             case CAN_COMMIT_COMPLETE:
393             case COMMIT_PENDING:
394             case PRE_COMMIT_COMPLETE:
395             case PRE_COMMIT_PENDING:
396                 throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
397             default:
398                 throwUnhandledCommitStage(ready);
399         }
400     }
401
402     void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
403         if (state instanceof Retired) {
404             LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
405                 getIdentifier());
406             return;
407         }
408
409         final Ready ready = checkReady();
410         recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
411             envelope.getMessage().getSequence()));
412         ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
413         LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
414     }
415
416     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
417         throwIfFailed();
418
419         final Ready ready = checkReady();
420         switch (ready.stage) {
421             case CAN_COMMIT_COMPLETE:
422             case CAN_COMMIT_PENDING:
423             case COMMIT_PENDING:
424             case PRE_COMMIT_COMPLETE:
425             case PRE_COMMIT_PENDING:
426                 LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
427                     getIdentifier(), state, envelope);
428                 break;
429             case READY:
430                 ready.stage = CommitStage.CAN_COMMIT_PENDING;
431                 LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
432                 ready.readyCohort.canCommit(new FutureCallback<Void>() {
433                     @Override
434                     public void onSuccess(final Void result) {
435                         successfulDirectCanCommit(envelope, now);
436                     }
437
438                     @Override
439                     public void onFailure(final Throwable failure) {
440                         failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
441                     }
442                 });
443                 break;
444             default:
445                 throwUnhandledCommitStage(ready);
446         }
447     }
448
449     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
450         if (state instanceof Retired) {
451             LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
452             return;
453         }
454
455         final Ready ready = checkReady();
456         ready.stage = CommitStage.PRE_COMMIT_PENDING;
457         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
458         ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
459             @Override
460             public void onSuccess(final DataTreeCandidate result) {
461                 successfulDirectPreCommit(envelope, startTime);
462             }
463
464             @Override
465             public void onFailure(final Throwable failure) {
466                 failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
467             }
468         });
469     }
470
471     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
472         if (state instanceof Retired) {
473             LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
474             return;
475         }
476
477         final Ready ready = checkReady();
478         ready.stage = CommitStage.COMMIT_PENDING;
479         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
480         ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
481             @Override
482             public void onSuccess(final UnsignedLong result) {
483                 successfulCommit(envelope, startTime);
484             }
485
486             @Override
487             public void onFailure(final Throwable failure) {
488                 failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
489             }
490         });
491     }
492
493     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
494         if (state instanceof Retired) {
495             LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
496             return;
497         }
498
499         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
500             envelope.getMessage().getSequence()));
501         state = COMMITTED;
502     }
503
504     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
505             final RequestEnvelope envelope, final long now) throws RequestException {
506         final DataTreeModification sealedModification = checkSealed();
507         if (!sealedModification.equals(request.getModification())) {
508             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
509             throw new UnsupportedRequestException(request);
510         }
511
512         final Optional<Exception> optFailure = request.getDelayedFailure();
513         if (optFailure.isPresent()) {
514             state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
515         } else {
516             state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, Optional.empty()));
517         }
518
519         if (request.isCoordinated()) {
520             coordinatedCommit(envelope, now);
521         } else {
522             directCommit(envelope, now);
523         }
524     }
525
526     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) {
527         final Optional<NormalizedNode> data = checkOpen().getSnapshot().readNode(request.getPath());
528         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
529             data.isPresent()));
530     }
531
532     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) {
533         final Optional<NormalizedNode> data = checkOpen().getSnapshot().readNode(request.getPath());
534         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
535             data));
536     }
537
538     private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
539         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
540     }
541
542     private void applyModifications(final Collection<TransactionModification> modifications) {
543         if (!modifications.isEmpty()) {
544             final DataTreeModification modification = checkOpen().getSnapshot();
545             for (TransactionModification m : modifications) {
546                 if (m instanceof TransactionDelete) {
547                     modification.delete(m.getPath());
548                 } else if (m instanceof TransactionWrite) {
549                     modification.write(m.getPath(), ((TransactionWrite) m).getData());
550                 } else if (m instanceof TransactionMerge) {
551                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
552                 } else {
553                     LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
554                 }
555             }
556         }
557     }
558
559     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
560             final RequestEnvelope envelope, final long now) throws RequestException {
561         // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
562         // protocol, there is nothing for us to do.
563         final Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
564         if (!maybeProto.isPresent()) {
565             applyModifications(request.getModifications());
566             return replyModifySuccess(request.getSequence());
567         }
568
569         switch (maybeProto.get()) {
570             case ABORT:
571                 if (ABORTING.equals(state)) {
572                     LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
573                     return null;
574                 }
575                 final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
576                 startAbort();
577                 openTransaction.abort(() -> {
578                     recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
579                         request.getSequence()));
580                     finishAbort();
581                 });
582                 return null;
583             case READY:
584                 ensureReady(request.getModifications());
585                 return replyModifySuccess(request.getSequence());
586             case SIMPLE:
587                 ensureReady(request.getModifications());
588                 directCommit(envelope, now);
589                 return null;
590             case THREE_PHASE:
591                 ensureReady(request.getModifications());
592                 coordinatedCommit(envelope, now);
593                 return null;
594             default:
595                 LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
596                 throw new UnsupportedRequestException(request);
597         }
598     }
599
600     private void ensureReady(final Collection<TransactionModification> modifications) {
601         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
602         // only once.
603         if (state instanceof Ready) {
604             LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
605             return;
606         }
607
608         applyModifications(modifications);
609         state = new Ready(checkOpen().ready(Optional.empty()));
610         LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
611     }
612
613     private void throwIfFailed() throws RequestException {
614         if (state instanceof Failed) {
615             LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
616             throw ((Failed) state).cause;
617         }
618     }
619
620     private ReadWriteShardDataTreeTransaction checkOpen() {
621         checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), state);
622         return ((Open) state).openTransaction;
623     }
624
625     private Ready checkReady() {
626         checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), state);
627         return (Ready) state;
628     }
629
630     private DataTreeModification checkSealed() {
631         checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), state);
632         return ((Sealed) state).sealedModification;
633     }
634
635     private static void throwUnhandledCommitStage(final Ready ready) {
636         throw new IllegalStateException("Unhandled commit stage " + ready.stage);
637     }
638 }