Abort pending txn's in Shard on leader transition and shutdown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Queue;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.slf4j.Logger;
37
38 /**
39  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
40  *
41  * @author Thomas Pantelis
42  */
43 class ShardCommitCoordinator {
44
45     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
46     public interface CohortDecorator {
47         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
48     }
49
50     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
51
52     private CohortEntry currentCohortEntry;
53
54     private final ShardDataTree dataTree;
55
56     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
57     // since this should only be accessed on the shard's dispatcher.
58     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
59
60     private int queueCapacity;
61
62     private final Logger log;
63
64     private final String name;
65
66     private final long cacheExpiryTimeoutInMillis;
67
68     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
69     private CohortDecorator cohortDecorator;
70
71     private ReadyTransactionReply readyTransactionReply;
72
73     private Runnable runOnPendingTransactionsComplete;
74
75     ShardCommitCoordinator(ShardDataTree dataTree,
76             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
77
78         this.queueCapacity = queueCapacity;
79         this.log = log;
80         this.name = name;
81         this.dataTree = Preconditions.checkNotNull(dataTree);
82         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
83     }
84
85     int getQueueSize() {
86         return queuedCohortEntries.size();
87     }
88
89     int getCohortCacheSize() {
90         return cohortCache.size();
91     }
92
93     void setQueueCapacity(int queueCapacity) {
94         this.queueCapacity = queueCapacity;
95     }
96
97     private ReadyTransactionReply readyTransactionReply(Shard shard) {
98         if(readyTransactionReply == null) {
99             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
100         }
101
102         return readyTransactionReply;
103     }
104
105     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
106         if(queuedCohortEntries.size() < queueCapacity) {
107             queuedCohortEntries.offer(cohortEntry);
108
109             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
110                     queuedCohortEntries.size());
111
112             return true;
113         } else {
114             cohortCache.remove(cohortEntry.getTransactionID());
115
116             RuntimeException ex = new RuntimeException(
117                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
118                                   " capacity %d has been reached.",
119                                   name, cohortEntry.getTransactionID(), queueCapacity));
120             log.error(ex.getMessage());
121             sender.tell(new Failure(ex), shard.self());
122             return false;
123         }
124     }
125
126     /**
127      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
128      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
129      *
130      * @param ready the ForwardedReadyTransaction message to process
131      * @param sender the sender of the message
132      * @param shard the transaction's shard actor
133      */
134     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
135         log.debug("{}: Readying transaction {}, client version {}", name,
136                 ready.getTransactionID(), ready.getTxnClientVersion());
137
138         ShardDataTreeCohort cohort = ready.getTransaction().ready();
139         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
140         cohortCache.put(ready.getTransactionID(), cohortEntry);
141
142         if(!queueCohortEntry(cohortEntry, sender, shard)) {
143             return;
144         }
145
146         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
147             // Return our actor path as we'll handle the three phase commit except if the Tx client
148             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
149             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
150             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
151             ActorRef replyActorPath = shard.self();
152             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
153                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
154                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
155                         ready.getTransactionID()));
156             }
157
158             ReadyTransactionReply readyTransactionReply =
159                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
160                             ready.getTxnClientVersion());
161             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
162                 readyTransactionReply, shard.self());
163         } else {
164             if(ready.isDoImmediateCommit()) {
165                 cohortEntry.setDoImmediateCommit(true);
166                 cohortEntry.setReplySender(sender);
167                 cohortEntry.setShard(shard);
168                 handleCanCommit(cohortEntry);
169             } else {
170                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
171                 // front-end so send back a ReadyTransactionReply with our actor path.
172                 sender.tell(readyTransactionReply(shard), shard.self());
173             }
174         }
175     }
176
177     /**
178      * This method handles a BatchedModifications message for a transaction being prepared directly on the
179      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
180      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
181      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
182      *
183      * @param batched the BatchedModifications message to process
184      * @param sender the sender of the message
185      * @param shard the transaction's shard actor
186      */
187     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
188         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
189         if(cohortEntry == null) {
190             cohortEntry = new CohortEntry(batched.getTransactionID(),
191                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
192                         batched.getTransactionChainID()));
193             cohortCache.put(batched.getTransactionID(), cohortEntry);
194         }
195
196         if(log.isDebugEnabled()) {
197             log.debug("{}: Applying {} batched modifications for Tx {}", name,
198                     batched.getModifications().size(), batched.getTransactionID());
199         }
200
201         cohortEntry.applyModifications(batched.getModifications());
202
203         if(batched.isReady()) {
204             if(cohortEntry.getLastBatchedModificationsException() != null) {
205                 cohortCache.remove(cohortEntry.getTransactionID());
206                 throw cohortEntry.getLastBatchedModificationsException();
207             }
208
209             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
210                 cohortCache.remove(cohortEntry.getTransactionID());
211                 throw new IllegalStateException(String.format(
212                         "The total number of batched messages received %d does not match the number sent %d",
213                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
214             }
215
216             if(!queueCohortEntry(cohortEntry, sender, shard)) {
217                 return;
218             }
219
220             if(log.isDebugEnabled()) {
221                 log.debug("{}: Readying Tx {}, client version {}", name,
222                         batched.getTransactionID(), batched.getVersion());
223             }
224
225             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
226
227             if(batched.isDoCommitOnReady()) {
228                 cohortEntry.setReplySender(sender);
229                 cohortEntry.setShard(shard);
230                 handleCanCommit(cohortEntry);
231             } else {
232                 sender.tell(readyTransactionReply(shard), shard.self());
233             }
234         } else {
235             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
236         }
237     }
238
239     /**
240      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
241      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
242      *
243      * @param message the ReadyLocalTransaction message to process
244      * @param sender the sender of the message
245      * @param shard the transaction's shard actor
246      */
247     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
248         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
249                 message.getTransactionID());
250         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
251         cohortCache.put(message.getTransactionID(), cohortEntry);
252         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
253
254         if(!queueCohortEntry(cohortEntry, sender, shard)) {
255             return;
256         }
257
258         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
259
260         if (message.isDoCommitOnReady()) {
261             cohortEntry.setReplySender(sender);
262             cohortEntry.setShard(shard);
263             handleCanCommit(cohortEntry);
264         } else {
265             sender.tell(readyTransactionReply(shard), shard.self());
266         }
267     }
268
269     private void handleCanCommit(CohortEntry cohortEntry) {
270         String transactionID = cohortEntry.getTransactionID();
271
272         cohortEntry.updateLastAccessTime();
273
274         if(currentCohortEntry != null) {
275             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
276             // queue and will get processed after all prior entries complete.
277
278             if(log.isDebugEnabled()) {
279                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
280                         name, currentCohortEntry.getTransactionID(), transactionID);
281             }
282
283             return;
284         }
285
286         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
287         // it the current entry and proceed with canCommit.
288         // Purposely checking reference equality here.
289         if(queuedCohortEntries.peek() == cohortEntry) {
290             currentCohortEntry = queuedCohortEntries.poll();
291             doCanCommit(currentCohortEntry);
292         } else {
293             if(log.isDebugEnabled()) {
294                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
295                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
296             }
297         }
298     }
299
300     /**
301      * This method handles the canCommit phase for a transaction.
302      *
303      * @param transactionID the ID of the transaction to canCommit
304      * @param sender the actor to which to send the response
305      * @param shard the transaction's shard actor
306      */
307     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
308         // Lookup the cohort entry that was cached previously (or should have been) by
309         // transactionReady (via the ForwardedReadyTransaction message).
310         final CohortEntry cohortEntry = cohortCache.get(transactionID);
311         if(cohortEntry == null) {
312             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
313             // between canCommit and ready and the entry was expired from the cache.
314             IllegalStateException ex = new IllegalStateException(
315                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
316             log.error(ex.getMessage());
317             sender.tell(new Failure(ex), shard.self());
318             return;
319         }
320
321         cohortEntry.setReplySender(sender);
322         cohortEntry.setShard(shard);
323
324         handleCanCommit(cohortEntry);
325     }
326
327     private void doCanCommit(final CohortEntry cohortEntry) {
328         boolean canCommit = false;
329         try {
330             canCommit = cohortEntry.canCommit();
331
332             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
333
334             if(cohortEntry.isDoImmediateCommit()) {
335                 if(canCommit) {
336                     doCommit(cohortEntry);
337                 } else {
338                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
339                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
340                 }
341             } else {
342                 cohortEntry.getReplySender().tell(
343                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
344                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
345             }
346         } catch (Exception e) {
347             log.debug("{}: An exception occurred during canCommit", name, e);
348
349             Throwable failure = e;
350             if(e instanceof ExecutionException) {
351                 failure = e.getCause();
352             }
353
354             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
355         } finally {
356             if(!canCommit) {
357                 // Remove the entry from the cache now.
358                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
359             }
360         }
361     }
362
363     private boolean doCommit(CohortEntry cohortEntry) {
364         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
365
366         boolean success = false;
367
368         // We perform the preCommit phase here atomically with the commit phase. This is an
369         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
370         // coordination of preCommit across shards in case of failure but preCommit should not
371         // normally fail since we ensure only one concurrent 3-phase commit.
372
373         try {
374             cohortEntry.preCommit();
375
376             cohortEntry.getShard().continueCommit(cohortEntry);
377
378             cohortEntry.updateLastAccessTime();
379
380             success = true;
381         } catch (Exception e) {
382             log.error("{} An exception occurred while preCommitting transaction {}",
383                     name, cohortEntry.getTransactionID(), e);
384             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
385
386             currentTransactionComplete(cohortEntry.getTransactionID(), true);
387         }
388
389         return success;
390     }
391
392     /**
393      * This method handles the preCommit and commit phases for a transaction.
394      *
395      * @param transactionID the ID of the transaction to commit
396      * @param sender the actor to which to send the response
397      * @param shard the transaction's shard actor
398      * @return true if the transaction was successfully prepared, false otherwise.
399      */
400     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
401         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
402         // this transaction.
403         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
404         if(cohortEntry == null) {
405             // We're not the current Tx - the Tx was likely expired b/c it took too long in
406             // between the canCommit and commit messages.
407             IllegalStateException ex = new IllegalStateException(
408                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
409                             name, transactionID));
410             log.error(ex.getMessage());
411             sender.tell(new Failure(ex), shard.self());
412             return false;
413         }
414
415         cohortEntry.setReplySender(sender);
416         return doCommit(cohortEntry);
417     }
418
419     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
420         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
421         if(cohortEntry != null) {
422             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
423             // aborted during replication in which case we may still commit locally if replication
424             // succeeds.
425             currentTransactionComplete(transactionID, false);
426         } else {
427             cohortEntry = getAndRemoveCohortEntry(transactionID);
428         }
429
430         if(cohortEntry == null) {
431             return;
432         }
433
434         log.debug("{}: Aborting transaction {}", name, transactionID);
435
436         final ActorRef self = shard.getSelf();
437         try {
438             cohortEntry.abort();
439
440             shard.getShardMBean().incrementAbortTransactionsCount();
441
442             if(sender != null) {
443                 sender.tell(new AbortTransactionReply().toSerializable(), self);
444             }
445         } catch (Exception e) {
446             log.error("{}: An exception happened during abort", name, e);
447
448             if(sender != null) {
449                 sender.tell(new Failure(e), self);
450             }
451         }
452     }
453
454     void checkForExpiredTransactions(final long timeout, final Shard shard) {
455         CohortEntry cohortEntry = getCurrentCohortEntry();
456         if(cohortEntry != null) {
457             if(cohortEntry.isExpired(timeout)) {
458                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
459                         name, cohortEntry.getTransactionID(), timeout);
460
461                 handleAbort(cohortEntry.getTransactionID(), null, shard);
462             }
463         }
464
465         cleanupExpiredCohortEntries();
466     }
467
468     void abortPendingTransactions(final String reason, final Shard shard) {
469         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
470             return;
471         }
472
473         List<CohortEntry> cohortEntries = new ArrayList<>();
474
475         if(currentCohortEntry != null) {
476             cohortEntries.add(currentCohortEntry);
477             currentCohortEntry = null;
478         }
479
480         cohortEntries.addAll(queuedCohortEntries);
481         queuedCohortEntries.clear();
482
483         for(CohortEntry cohortEntry: cohortEntries) {
484             if(cohortEntry.getReplySender() != null) {
485                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
486             }
487         }
488     }
489
490     /**
491      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
492      * matches the current entry.
493      *
494      * @param transactionID the ID of the transaction
495      * @return the current CohortEntry or null if the given transaction ID does not match the
496      *         current entry.
497      */
498     CohortEntry getCohortEntryIfCurrent(String transactionID) {
499         if(isCurrentTransaction(transactionID)) {
500             return currentCohortEntry;
501         }
502
503         return null;
504     }
505
506     CohortEntry getCurrentCohortEntry() {
507         return currentCohortEntry;
508     }
509
510     CohortEntry getAndRemoveCohortEntry(String transactionID) {
511         return cohortCache.remove(transactionID);
512     }
513
514     boolean isCurrentTransaction(String transactionID) {
515         return currentCohortEntry != null &&
516                 currentCohortEntry.getTransactionID().equals(transactionID);
517     }
518
519     /**
520      * This method is called when a transaction is complete, successful or not. If the given
521      * given transaction ID matches the current in-progress transaction, the next cohort entry,
522      * if any, is dequeued and processed.
523      *
524      * @param transactionID the ID of the completed transaction
525      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
526      *        the cache.
527      */
528     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
529         if(removeCohortEntry) {
530             cohortCache.remove(transactionID);
531         }
532
533         if(isCurrentTransaction(transactionID)) {
534             currentCohortEntry = null;
535
536             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
537
538             maybeProcessNextCohortEntry();
539         }
540     }
541
542     private void maybeProcessNextCohortEntry() {
543         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
544         // clean out expired entries.
545         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
546         while(iter.hasNext()) {
547             CohortEntry next = iter.next();
548             if(next.isReadyToCommit()) {
549                 if(currentCohortEntry == null) {
550                     if(log.isDebugEnabled()) {
551                         log.debug("{}: Next entry to canCommit {}", name, next);
552                     }
553
554                     iter.remove();
555                     currentCohortEntry = next;
556                     currentCohortEntry.updateLastAccessTime();
557                     doCanCommit(currentCohortEntry);
558                 }
559
560                 break;
561             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
562                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
563                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
564             } else if(!next.isAborted()) {
565                 break;
566             }
567
568             iter.remove();
569             cohortCache.remove(next.getTransactionID());
570         }
571
572         maybeRunOperationOnPendingTransactionsComplete();
573     }
574
575     void cleanupExpiredCohortEntries() {
576         maybeProcessNextCohortEntry();
577     }
578
579     void setRunOnPendingTransactionsComplete(Runnable operation) {
580         runOnPendingTransactionsComplete = operation;
581         maybeRunOperationOnPendingTransactionsComplete();
582     }
583
584     private void maybeRunOperationOnPendingTransactionsComplete() {
585         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
586             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
587
588             runOnPendingTransactionsComplete.run();
589             runOnPendingTransactionsComplete = null;
590         }
591     }
592
593     @VisibleForTesting
594     void setCohortDecorator(CohortDecorator cohortDecorator) {
595         this.cohortDecorator = cohortDecorator;
596     }
597
598     static class CohortEntry {
599         private final String transactionID;
600         private ShardDataTreeCohort cohort;
601         private final ReadWriteShardDataTreeTransaction transaction;
602         private RuntimeException lastBatchedModificationsException;
603         private ActorRef replySender;
604         private Shard shard;
605         private boolean doImmediateCommit;
606         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
607         private int totalBatchedModificationsReceived;
608         private boolean aborted;
609
610         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
611             this.transaction = Preconditions.checkNotNull(transaction);
612             this.transactionID = transactionID;
613         }
614
615         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
616             this.transactionID = transactionID;
617             this.cohort = cohort;
618             this.transaction = null;
619         }
620
621         void updateLastAccessTime() {
622             lastAccessTimer.reset();
623             lastAccessTimer.start();
624         }
625
626         String getTransactionID() {
627             return transactionID;
628         }
629
630         DataTreeCandidate getCandidate() {
631             return cohort.getCandidate();
632         }
633
634         int getTotalBatchedModificationsReceived() {
635             return totalBatchedModificationsReceived;
636         }
637
638         RuntimeException getLastBatchedModificationsException() {
639             return lastBatchedModificationsException;
640         }
641
642         void applyModifications(Iterable<Modification> modifications) {
643             totalBatchedModificationsReceived++;
644             if(lastBatchedModificationsException == null) {
645                 for (Modification modification : modifications) {
646                         try {
647                             modification.apply(transaction.getSnapshot());
648                         } catch (RuntimeException e) {
649                             lastBatchedModificationsException = e;
650                             throw e;
651                         }
652                 }
653             }
654         }
655
656         boolean canCommit() throws InterruptedException, ExecutionException {
657             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
658             // about possibly accessing our state on a different thread outside of our dispatcher.
659             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
660             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
661             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
662             return cohort.canCommit().get();
663         }
664
665         void preCommit() throws InterruptedException, ExecutionException {
666             cohort.preCommit().get();
667         }
668
669         void commit() throws InterruptedException, ExecutionException {
670             cohort.commit().get();
671         }
672
673         void abort() throws InterruptedException, ExecutionException {
674             aborted = true;
675             cohort.abort().get();
676         }
677
678         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
679             Preconditions.checkState(cohort == null, "cohort was already set");
680
681             setDoImmediateCommit(doImmediateCommit);
682
683             cohort = transaction.ready();
684
685             if(cohortDecorator != null) {
686                 // Call the hook for unit tests.
687                 cohort = cohortDecorator.decorate(transactionID, cohort);
688             }
689         }
690
691         boolean isReadyToCommit() {
692             return replySender != null;
693         }
694
695         boolean isExpired(long expireTimeInMillis) {
696             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
697         }
698
699         boolean isDoImmediateCommit() {
700             return doImmediateCommit;
701         }
702
703         void setDoImmediateCommit(boolean doImmediateCommit) {
704             this.doImmediateCommit = doImmediateCommit;
705         }
706
707         ActorRef getReplySender() {
708             return replySender;
709         }
710
711         void setReplySender(ActorRef replySender) {
712             this.replySender = replySender;
713         }
714
715         Shard getShard() {
716             return shard;
717         }
718
719         void setShard(Shard shard) {
720             this.shard = shard;
721         }
722
723
724         boolean isAborted() {
725             return aborted;
726         }
727
728         @Override
729         public String toString() {
730             StringBuilder builder = new StringBuilder();
731             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
732                     .append(doImmediateCommit).append("]");
733             return builder.toString();
734         }
735     }
736 }