Handle 3PC message backwards compatibility
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Queue;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
27 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
35 import org.slf4j.Logger;
36
37 /**
38  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
39  *
40  * @author Thomas Pantelis
41  */
42 class ShardCommitCoordinator {
43
44     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
45     public interface CohortDecorator {
46         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
47     }
48
49     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
50
51     private CohortEntry currentCohortEntry;
52
53     private final ShardDataTree dataTree;
54
55     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
56     // since this should only be accessed on the shard's dispatcher.
57     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
58
59     private int queueCapacity;
60
61     private final Logger log;
62
63     private final String name;
64
65     private final long cacheExpiryTimeoutInMillis;
66
67     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
68     private CohortDecorator cohortDecorator;
69
70     private ReadyTransactionReply readyTransactionReply;
71
72     private Runnable runOnPendingTransactionsComplete;
73
74     ShardCommitCoordinator(ShardDataTree dataTree,
75             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
76
77         this.queueCapacity = queueCapacity;
78         this.log = log;
79         this.name = name;
80         this.dataTree = Preconditions.checkNotNull(dataTree);
81         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
82     }
83
84     int getQueueSize() {
85         return queuedCohortEntries.size();
86     }
87
88     int getCohortCacheSize() {
89         return cohortCache.size();
90     }
91
92     void setQueueCapacity(int queueCapacity) {
93         this.queueCapacity = queueCapacity;
94     }
95
96     private ReadyTransactionReply readyTransactionReply(Shard shard) {
97         if(readyTransactionReply == null) {
98             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
99         }
100
101         return readyTransactionReply;
102     }
103
104     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
105         if(queuedCohortEntries.size() < queueCapacity) {
106             queuedCohortEntries.offer(cohortEntry);
107
108             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
109                     queuedCohortEntries.size());
110
111             return true;
112         } else {
113             cohortCache.remove(cohortEntry.getTransactionID());
114
115             RuntimeException ex = new RuntimeException(
116                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
117                                   " capacity %d has been reached.",
118                                   name, cohortEntry.getTransactionID(), queueCapacity));
119             log.error(ex.getMessage());
120             sender.tell(new Failure(ex), shard.self());
121             return false;
122         }
123     }
124
125     /**
126      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
127      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
128      *
129      * @param ready the ForwardedReadyTransaction message to process
130      * @param sender the sender of the message
131      * @param shard the transaction's shard actor
132      */
133     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
134         log.debug("{}: Readying transaction {}, client version {}", name,
135                 ready.getTransactionID(), ready.getTxnClientVersion());
136
137         ShardDataTreeCohort cohort = ready.getTransaction().ready();
138         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
139         cohortCache.put(ready.getTransactionID(), cohortEntry);
140
141         if(!queueCohortEntry(cohortEntry, sender, shard)) {
142             return;
143         }
144
145         if(ready.isDoImmediateCommit()) {
146             cohortEntry.setDoImmediateCommit(true);
147             cohortEntry.setReplySender(sender);
148             cohortEntry.setShard(shard);
149             handleCanCommit(cohortEntry);
150         } else {
151             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
152             // front-end so send back a ReadyTransactionReply with our actor path.
153             sender.tell(readyTransactionReply(shard), shard.self());
154         }
155     }
156
157     /**
158      * This method handles a BatchedModifications message for a transaction being prepared directly on the
159      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
160      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
161      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
162      *
163      * @param batched the BatchedModifications message to process
164      * @param sender the sender of the message
165      * @param shard the transaction's shard actor
166      */
167     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
168         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
169         if(cohortEntry == null) {
170             cohortEntry = new CohortEntry(batched.getTransactionID(),
171                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
172                         batched.getTransactionChainID()), batched.getVersion());
173             cohortCache.put(batched.getTransactionID(), cohortEntry);
174         }
175
176         if(log.isDebugEnabled()) {
177             log.debug("{}: Applying {} batched modifications for Tx {}", name,
178                     batched.getModifications().size(), batched.getTransactionID());
179         }
180
181         cohortEntry.applyModifications(batched.getModifications());
182
183         if(batched.isReady()) {
184             if(cohortEntry.getLastBatchedModificationsException() != null) {
185                 cohortCache.remove(cohortEntry.getTransactionID());
186                 throw cohortEntry.getLastBatchedModificationsException();
187             }
188
189             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
190                 cohortCache.remove(cohortEntry.getTransactionID());
191                 throw new IllegalStateException(String.format(
192                         "The total number of batched messages received %d does not match the number sent %d",
193                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
194             }
195
196             if(!queueCohortEntry(cohortEntry, sender, shard)) {
197                 return;
198             }
199
200             if(log.isDebugEnabled()) {
201                 log.debug("{}: Readying Tx {}, client version {}", name,
202                         batched.getTransactionID(), batched.getVersion());
203             }
204
205             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
206
207             if(batched.isDoCommitOnReady()) {
208                 cohortEntry.setReplySender(sender);
209                 cohortEntry.setShard(shard);
210                 handleCanCommit(cohortEntry);
211             } else {
212                 sender.tell(readyTransactionReply(shard), shard.self());
213             }
214         } else {
215             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
216         }
217     }
218
219     /**
220      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
221      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
222      *
223      * @param message the ReadyLocalTransaction message to process
224      * @param sender the sender of the message
225      * @param shard the transaction's shard actor
226      */
227     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
228         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
229                 message.getTransactionID());
230         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
231                 DataStoreVersions.CURRENT_VERSION);
232         cohortCache.put(message.getTransactionID(), cohortEntry);
233         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
234
235         if(!queueCohortEntry(cohortEntry, sender, shard)) {
236             return;
237         }
238
239         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
240
241         if (message.isDoCommitOnReady()) {
242             cohortEntry.setReplySender(sender);
243             cohortEntry.setShard(shard);
244             handleCanCommit(cohortEntry);
245         } else {
246             sender.tell(readyTransactionReply(shard), shard.self());
247         }
248     }
249
250     private void handleCanCommit(CohortEntry cohortEntry) {
251         String transactionID = cohortEntry.getTransactionID();
252
253         cohortEntry.updateLastAccessTime();
254
255         if(currentCohortEntry != null) {
256             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
257             // queue and will get processed after all prior entries complete.
258
259             if(log.isDebugEnabled()) {
260                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
261                         name, currentCohortEntry.getTransactionID(), transactionID);
262             }
263
264             return;
265         }
266
267         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
268         // it the current entry and proceed with canCommit.
269         // Purposely checking reference equality here.
270         if(queuedCohortEntries.peek() == cohortEntry) {
271             currentCohortEntry = queuedCohortEntries.poll();
272             doCanCommit(currentCohortEntry);
273         } else {
274             if(log.isDebugEnabled()) {
275                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
276                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
277             }
278         }
279     }
280
281     /**
282      * This method handles the canCommit phase for a transaction.
283      *
284      * @param transactionID the ID of the transaction to canCommit
285      * @param sender the actor to which to send the response
286      * @param shard the transaction's shard actor
287      */
288     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
289         // Lookup the cohort entry that was cached previously (or should have been) by
290         // transactionReady (via the ForwardedReadyTransaction message).
291         final CohortEntry cohortEntry = cohortCache.get(transactionID);
292         if(cohortEntry == null) {
293             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
294             // between canCommit and ready and the entry was expired from the cache.
295             IllegalStateException ex = new IllegalStateException(
296                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
297             log.error(ex.getMessage());
298             sender.tell(new Failure(ex), shard.self());
299             return;
300         }
301
302         cohortEntry.setReplySender(sender);
303         cohortEntry.setShard(shard);
304
305         handleCanCommit(cohortEntry);
306     }
307
308     private void doCanCommit(final CohortEntry cohortEntry) {
309         boolean canCommit = false;
310         try {
311             canCommit = cohortEntry.canCommit();
312
313             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
314
315             if(cohortEntry.isDoImmediateCommit()) {
316                 if(canCommit) {
317                     doCommit(cohortEntry);
318                 } else {
319                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
320                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
321                 }
322             } else {
323                 // FIXME - use caller's version
324                 cohortEntry.getReplySender().tell(
325                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
326                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
327                         cohortEntry.getShard().self());
328             }
329         } catch (Exception e) {
330             log.debug("{}: An exception occurred during canCommit", name, e);
331
332             Throwable failure = e;
333             if(e instanceof ExecutionException) {
334                 failure = e.getCause();
335             }
336
337             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
338         } finally {
339             if(!canCommit) {
340                 // Remove the entry from the cache now.
341                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
342             }
343         }
344     }
345
346     private boolean doCommit(CohortEntry cohortEntry) {
347         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
348
349         boolean success = false;
350
351         // We perform the preCommit phase here atomically with the commit phase. This is an
352         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
353         // coordination of preCommit across shards in case of failure but preCommit should not
354         // normally fail since we ensure only one concurrent 3-phase commit.
355
356         try {
357             cohortEntry.preCommit();
358
359             cohortEntry.getShard().continueCommit(cohortEntry);
360
361             cohortEntry.updateLastAccessTime();
362
363             success = true;
364         } catch (Exception e) {
365             log.error("{} An exception occurred while preCommitting transaction {}",
366                     name, cohortEntry.getTransactionID(), e);
367             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
368
369             currentTransactionComplete(cohortEntry.getTransactionID(), true);
370         }
371
372         return success;
373     }
374
375     /**
376      * This method handles the preCommit and commit phases for a transaction.
377      *
378      * @param transactionID the ID of the transaction to commit
379      * @param sender the actor to which to send the response
380      * @param shard the transaction's shard actor
381      * @return true if the transaction was successfully prepared, false otherwise.
382      */
383     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
384         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
385         // this transaction.
386         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
387         if(cohortEntry == null) {
388             // We're not the current Tx - the Tx was likely expired b/c it took too long in
389             // between the canCommit and commit messages.
390             IllegalStateException ex = new IllegalStateException(
391                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
392                             name, transactionID));
393             log.error(ex.getMessage());
394             sender.tell(new Failure(ex), shard.self());
395             return false;
396         }
397
398         cohortEntry.setReplySender(sender);
399         return doCommit(cohortEntry);
400     }
401
402     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
403         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
404         if(cohortEntry != null) {
405             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
406             // aborted during replication in which case we may still commit locally if replication
407             // succeeds.
408             currentTransactionComplete(transactionID, false);
409         } else {
410             cohortEntry = getAndRemoveCohortEntry(transactionID);
411         }
412
413         if(cohortEntry == null) {
414             return;
415         }
416
417         log.debug("{}: Aborting transaction {}", name, transactionID);
418
419         final ActorRef self = shard.getSelf();
420         try {
421             cohortEntry.abort();
422
423             shard.getShardMBean().incrementAbortTransactionsCount();
424
425             if(sender != null) {
426                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
427             }
428         } catch (Exception e) {
429             log.error("{}: An exception happened during abort", name, e);
430
431             if(sender != null) {
432                 sender.tell(new Failure(e), self);
433             }
434         }
435     }
436
437     void checkForExpiredTransactions(final long timeout, final Shard shard) {
438         CohortEntry cohortEntry = getCurrentCohortEntry();
439         if(cohortEntry != null) {
440             if(cohortEntry.isExpired(timeout)) {
441                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
442                         name, cohortEntry.getTransactionID(), timeout);
443
444                 handleAbort(cohortEntry.getTransactionID(), null, shard);
445             }
446         }
447
448         cleanupExpiredCohortEntries();
449     }
450
451     void abortPendingTransactions(final String reason, final Shard shard) {
452         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
453             return;
454         }
455
456         List<CohortEntry> cohortEntries = new ArrayList<>();
457
458         if(currentCohortEntry != null) {
459             cohortEntries.add(currentCohortEntry);
460             currentCohortEntry = null;
461         }
462
463         cohortEntries.addAll(queuedCohortEntries);
464         queuedCohortEntries.clear();
465
466         for(CohortEntry cohortEntry: cohortEntries) {
467             if(cohortEntry.getReplySender() != null) {
468                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
469             }
470         }
471     }
472
473     /**
474      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
475      * matches the current entry.
476      *
477      * @param transactionID the ID of the transaction
478      * @return the current CohortEntry or null if the given transaction ID does not match the
479      *         current entry.
480      */
481     CohortEntry getCohortEntryIfCurrent(String transactionID) {
482         if(isCurrentTransaction(transactionID)) {
483             return currentCohortEntry;
484         }
485
486         return null;
487     }
488
489     CohortEntry getCurrentCohortEntry() {
490         return currentCohortEntry;
491     }
492
493     CohortEntry getAndRemoveCohortEntry(String transactionID) {
494         return cohortCache.remove(transactionID);
495     }
496
497     boolean isCurrentTransaction(String transactionID) {
498         return currentCohortEntry != null &&
499                 currentCohortEntry.getTransactionID().equals(transactionID);
500     }
501
502     /**
503      * This method is called when a transaction is complete, successful or not. If the given
504      * given transaction ID matches the current in-progress transaction, the next cohort entry,
505      * if any, is dequeued and processed.
506      *
507      * @param transactionID the ID of the completed transaction
508      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
509      *        the cache.
510      */
511     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
512         if(removeCohortEntry) {
513             cohortCache.remove(transactionID);
514         }
515
516         if(isCurrentTransaction(transactionID)) {
517             currentCohortEntry = null;
518
519             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
520
521             maybeProcessNextCohortEntry();
522         }
523     }
524
525     private void maybeProcessNextCohortEntry() {
526         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
527         // clean out expired entries.
528         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
529         while(iter.hasNext()) {
530             CohortEntry next = iter.next();
531             if(next.isReadyToCommit()) {
532                 if(currentCohortEntry == null) {
533                     if(log.isDebugEnabled()) {
534                         log.debug("{}: Next entry to canCommit {}", name, next);
535                     }
536
537                     iter.remove();
538                     currentCohortEntry = next;
539                     currentCohortEntry.updateLastAccessTime();
540                     doCanCommit(currentCohortEntry);
541                 }
542
543                 break;
544             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
545                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
546                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
547             } else if(!next.isAborted()) {
548                 break;
549             }
550
551             iter.remove();
552             cohortCache.remove(next.getTransactionID());
553         }
554
555         maybeRunOperationOnPendingTransactionsComplete();
556     }
557
558     void cleanupExpiredCohortEntries() {
559         maybeProcessNextCohortEntry();
560     }
561
562     void setRunOnPendingTransactionsComplete(Runnable operation) {
563         runOnPendingTransactionsComplete = operation;
564         maybeRunOperationOnPendingTransactionsComplete();
565     }
566
567     private void maybeRunOperationOnPendingTransactionsComplete() {
568         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
569             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
570
571             runOnPendingTransactionsComplete.run();
572             runOnPendingTransactionsComplete = null;
573         }
574     }
575
576     @VisibleForTesting
577     void setCohortDecorator(CohortDecorator cohortDecorator) {
578         this.cohortDecorator = cohortDecorator;
579     }
580
581     static class CohortEntry {
582         private final String transactionID;
583         private ShardDataTreeCohort cohort;
584         private final ReadWriteShardDataTreeTransaction transaction;
585         private RuntimeException lastBatchedModificationsException;
586         private ActorRef replySender;
587         private Shard shard;
588         private boolean doImmediateCommit;
589         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
590         private int totalBatchedModificationsReceived;
591         private boolean aborted;
592         private final short clientVersion;
593
594         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
595             this.transaction = Preconditions.checkNotNull(transaction);
596             this.transactionID = transactionID;
597             this.clientVersion = clientVersion;
598         }
599
600         CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
601             this.transactionID = transactionID;
602             this.cohort = cohort;
603             this.transaction = null;
604             this.clientVersion = clientVersion;
605         }
606
607         void updateLastAccessTime() {
608             lastAccessTimer.reset();
609             lastAccessTimer.start();
610         }
611
612         String getTransactionID() {
613             return transactionID;
614         }
615
616         short getClientVersion() {
617             return clientVersion;
618         }
619
620         DataTreeCandidate getCandidate() {
621             return cohort.getCandidate();
622         }
623
624         int getTotalBatchedModificationsReceived() {
625             return totalBatchedModificationsReceived;
626         }
627
628         RuntimeException getLastBatchedModificationsException() {
629             return lastBatchedModificationsException;
630         }
631
632         void applyModifications(Iterable<Modification> modifications) {
633             totalBatchedModificationsReceived++;
634             if(lastBatchedModificationsException == null) {
635                 for (Modification modification : modifications) {
636                         try {
637                             modification.apply(transaction.getSnapshot());
638                         } catch (RuntimeException e) {
639                             lastBatchedModificationsException = e;
640                             throw e;
641                         }
642                 }
643             }
644         }
645
646         boolean canCommit() throws InterruptedException, ExecutionException {
647             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
648             // about possibly accessing our state on a different thread outside of our dispatcher.
649             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
650             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
651             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
652             return cohort.canCommit().get();
653         }
654
655         void preCommit() throws InterruptedException, ExecutionException {
656             cohort.preCommit().get();
657         }
658
659         void commit() throws InterruptedException, ExecutionException {
660             cohort.commit().get();
661         }
662
663         void abort() throws InterruptedException, ExecutionException {
664             aborted = true;
665             cohort.abort().get();
666         }
667
668         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
669             Preconditions.checkState(cohort == null, "cohort was already set");
670
671             setDoImmediateCommit(doImmediateCommit);
672
673             cohort = transaction.ready();
674
675             if(cohortDecorator != null) {
676                 // Call the hook for unit tests.
677                 cohort = cohortDecorator.decorate(transactionID, cohort);
678             }
679         }
680
681         boolean isReadyToCommit() {
682             return replySender != null;
683         }
684
685         boolean isExpired(long expireTimeInMillis) {
686             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
687         }
688
689         boolean isDoImmediateCommit() {
690             return doImmediateCommit;
691         }
692
693         void setDoImmediateCommit(boolean doImmediateCommit) {
694             this.doImmediateCommit = doImmediateCommit;
695         }
696
697         ActorRef getReplySender() {
698             return replySender;
699         }
700
701         void setReplySender(ActorRef replySender) {
702             this.replySender = replySender;
703         }
704
705         Shard getShard() {
706             return shard;
707         }
708
709         void setShard(Shard shard) {
710             this.shard = shard;
711         }
712
713
714         boolean isAborted() {
715             return aborted;
716         }
717
718         @Override
719         public String toString() {
720             StringBuilder builder = new StringBuilder();
721             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
722                     .append(doImmediateCommit).append("]");
723             return builder.toString();
724         }
725     }
726 }