7c45bd0702db7c07fd6d6a2037c1721204415eab
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.Modification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.slf4j.Logger;
35
36 /**
37  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
38  *
39  * @author Thomas Pantelis
40  */
41 class ShardCommitCoordinator {
42
43     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
44     public interface CohortDecorator {
45         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
46     }
47
48     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
49
50     private CohortEntry currentCohortEntry;
51
52     private final ShardDataTree dataTree;
53
54     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
55     // since this should only be accessed on the shard's dispatcher.
56     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
57
58     private int queueCapacity;
59
60     private final Logger log;
61
62     private final String name;
63
64     private final long cacheExpiryTimeoutInMillis;
65
66     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
67     private CohortDecorator cohortDecorator;
68
69     private ReadyTransactionReply readyTransactionReply;
70
71     ShardCommitCoordinator(ShardDataTree dataTree,
72             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
73
74         this.queueCapacity = queueCapacity;
75         this.log = log;
76         this.name = name;
77         this.dataTree = Preconditions.checkNotNull(dataTree);
78         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
79     }
80
81     int getQueueSize() {
82         return queuedCohortEntries.size();
83     }
84
85     void setQueueCapacity(int queueCapacity) {
86         this.queueCapacity = queueCapacity;
87     }
88
89     private ReadyTransactionReply readyTransactionReply(Shard shard) {
90         if(readyTransactionReply == null) {
91             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
92         }
93
94         return readyTransactionReply;
95     }
96
97     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
98         if(queuedCohortEntries.size() < queueCapacity) {
99             queuedCohortEntries.offer(cohortEntry);
100
101             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
102                     queuedCohortEntries.size());
103
104             return true;
105         } else {
106             cohortCache.remove(cohortEntry.getTransactionID());
107
108             RuntimeException ex = new RuntimeException(
109                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
110                                   " capacity %d has been reached.",
111                                   name, cohortEntry.getTransactionID(), queueCapacity));
112             log.error(ex.getMessage());
113             sender.tell(new Status.Failure(ex), shard.self());
114             return false;
115         }
116     }
117
118     /**
119      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
120      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
121      *
122      * @param ready the ForwardedReadyTransaction message to process
123      * @param sender the sender of the message
124      * @param shard the transaction's shard actor
125      */
126     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
127         log.debug("{}: Readying transaction {}, client version {}", name,
128                 ready.getTransactionID(), ready.getTxnClientVersion());
129
130         ShardDataTreeCohort cohort = ready.getTransaction().ready();
131         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
132         cohortCache.put(ready.getTransactionID(), cohortEntry);
133
134         if(!queueCohortEntry(cohortEntry, sender, shard)) {
135             return;
136         }
137
138         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
139             // Return our actor path as we'll handle the three phase commit except if the Tx client
140             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
141             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
142             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
143             ActorRef replyActorPath = shard.self();
144             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
145                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
146                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
147                         ready.getTransactionID()));
148             }
149
150             ReadyTransactionReply readyTransactionReply =
151                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
152                             ready.getTxnClientVersion());
153             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
154                 readyTransactionReply, shard.self());
155         } else {
156             if(ready.isDoImmediateCommit()) {
157                 cohortEntry.setDoImmediateCommit(true);
158                 cohortEntry.setReplySender(sender);
159                 cohortEntry.setShard(shard);
160                 handleCanCommit(cohortEntry);
161             } else {
162                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
163                 // front-end so send back a ReadyTransactionReply with our actor path.
164                 sender.tell(readyTransactionReply(shard), shard.self());
165             }
166         }
167     }
168
169     /**
170      * This method handles a BatchedModifications message for a transaction being prepared directly on the
171      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
172      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
173      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
174      *
175      * @param batched the BatchedModifications message to process
176      * @param sender the sender of the message
177      * @param shard the transaction's shard actor
178      */
179     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
180         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
181         if(cohortEntry == null) {
182             cohortEntry = new CohortEntry(batched.getTransactionID(),
183                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
184                         batched.getTransactionChainID()));
185             cohortCache.put(batched.getTransactionID(), cohortEntry);
186         }
187
188         if(log.isDebugEnabled()) {
189             log.debug("{}: Applying {} batched modifications for Tx {}", name,
190                     batched.getModifications().size(), batched.getTransactionID());
191         }
192
193         cohortEntry.applyModifications(batched.getModifications());
194
195         if(batched.isReady()) {
196             if(cohortEntry.getLastBatchedModificationsException() != null) {
197                 cohortCache.remove(cohortEntry.getTransactionID());
198                 throw cohortEntry.getLastBatchedModificationsException();
199             }
200
201             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
202                 cohortCache.remove(cohortEntry.getTransactionID());
203                 throw new IllegalStateException(String.format(
204                         "The total number of batched messages received %d does not match the number sent %d",
205                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
206             }
207
208             if(!queueCohortEntry(cohortEntry, sender, shard)) {
209                 return;
210             }
211
212             if(log.isDebugEnabled()) {
213                 log.debug("{}: Readying Tx {}, client version {}", name,
214                         batched.getTransactionID(), batched.getVersion());
215             }
216
217             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
218
219             if(batched.isDoCommitOnReady()) {
220                 cohortEntry.setReplySender(sender);
221                 cohortEntry.setShard(shard);
222                 handleCanCommit(cohortEntry);
223             } else {
224                 sender.tell(readyTransactionReply(shard), shard.self());
225             }
226         } else {
227             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
228         }
229     }
230
231     /**
232      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
233      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
234      *
235      * @param message the ReadyLocalTransaction message to process
236      * @param sender the sender of the message
237      * @param shard the transaction's shard actor
238      */
239     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
240         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
241                 message.getTransactionID());
242         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
243         cohortCache.put(message.getTransactionID(), cohortEntry);
244         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
245
246         if(!queueCohortEntry(cohortEntry, sender, shard)) {
247             return;
248         }
249
250         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
251
252         if (message.isDoCommitOnReady()) {
253             cohortEntry.setReplySender(sender);
254             cohortEntry.setShard(shard);
255             handleCanCommit(cohortEntry);
256         } else {
257             sender.tell(readyTransactionReply(shard), shard.self());
258         }
259     }
260
261     private void handleCanCommit(CohortEntry cohortEntry) {
262         String transactionID = cohortEntry.getTransactionID();
263
264         cohortEntry.updateLastAccessTime();
265
266         if(currentCohortEntry != null) {
267             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
268             // queue and will get processed after all prior entries complete.
269
270             if(log.isDebugEnabled()) {
271                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
272                         name, currentCohortEntry.getTransactionID(), transactionID);
273             }
274
275             return;
276         }
277
278         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
279         // it the current entry and proceed with canCommit.
280         // Purposely checking reference equality here.
281         if(queuedCohortEntries.peek() == cohortEntry) {
282             currentCohortEntry = queuedCohortEntries.poll();
283             doCanCommit(currentCohortEntry);
284         } else {
285             if(log.isDebugEnabled()) {
286                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
287                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
288             }
289         }
290     }
291
292     /**
293      * This method handles the canCommit phase for a transaction.
294      *
295      * @param transactionID the ID of the transaction to canCommit
296      * @param sender the actor to which to send the response
297      * @param shard the transaction's shard actor
298      */
299     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
300         // Lookup the cohort entry that was cached previously (or should have been) by
301         // transactionReady (via the ForwardedReadyTransaction message).
302         final CohortEntry cohortEntry = cohortCache.get(transactionID);
303         if(cohortEntry == null) {
304             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
305             // between canCommit and ready and the entry was expired from the cache.
306             IllegalStateException ex = new IllegalStateException(
307                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
308             log.error(ex.getMessage());
309             sender.tell(new Status.Failure(ex), shard.self());
310             return;
311         }
312
313         cohortEntry.setReplySender(sender);
314         cohortEntry.setShard(shard);
315
316         handleCanCommit(cohortEntry);
317     }
318
319     private void doCanCommit(final CohortEntry cohortEntry) {
320         boolean canCommit = false;
321         try {
322             canCommit = cohortEntry.canCommit();
323
324             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
325
326             if(cohortEntry.isDoImmediateCommit()) {
327                 if(canCommit) {
328                     doCommit(cohortEntry);
329                 } else {
330                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
331                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
332                 }
333             } else {
334                 cohortEntry.getReplySender().tell(
335                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
336                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
337             }
338         } catch (Exception e) {
339             log.debug("{}: An exception occurred during canCommit", name, e);
340
341             Throwable failure = e;
342             if(e instanceof ExecutionException) {
343                 failure = e.getCause();
344             }
345
346             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
347         } finally {
348             if(!canCommit) {
349                 // Remove the entry from the cache now.
350                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
351             }
352         }
353     }
354
355     private boolean doCommit(CohortEntry cohortEntry) {
356         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
357
358         boolean success = false;
359
360         // We perform the preCommit phase here atomically with the commit phase. This is an
361         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
362         // coordination of preCommit across shards in case of failure but preCommit should not
363         // normally fail since we ensure only one concurrent 3-phase commit.
364
365         try {
366             cohortEntry.preCommit();
367
368             cohortEntry.getShard().continueCommit(cohortEntry);
369
370             cohortEntry.updateLastAccessTime();
371
372             success = true;
373         } catch (Exception e) {
374             log.error("{} An exception occurred while preCommitting transaction {}",
375                     name, cohortEntry.getTransactionID(), e);
376             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
377
378             currentTransactionComplete(cohortEntry.getTransactionID(), true);
379         }
380
381         return success;
382     }
383
384     /**
385      * This method handles the preCommit and commit phases for a transaction.
386      *
387      * @param transactionID the ID of the transaction to commit
388      * @param sender the actor to which to send the response
389      * @param shard the transaction's shard actor
390      * @return true if the transaction was successfully prepared, false otherwise.
391      */
392     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
393         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
394         // this transaction.
395         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
396         if(cohortEntry == null) {
397             // We're not the current Tx - the Tx was likely expired b/c it took too long in
398             // between the canCommit and commit messages.
399             IllegalStateException ex = new IllegalStateException(
400                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
401                             name, transactionID));
402             log.error(ex.getMessage());
403             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
404             return false;
405         }
406
407         cohortEntry.setReplySender(sender);
408         return doCommit(cohortEntry);
409     }
410
411     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
412         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
413         if(cohortEntry != null) {
414             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
415             // aborted during replication in which case we may still commit locally if replication
416             // succeeds.
417             currentTransactionComplete(transactionID, false);
418         } else {
419             cohortEntry = getAndRemoveCohortEntry(transactionID);
420         }
421
422         if(cohortEntry == null) {
423             return;
424         }
425
426         log.debug("{}: Aborting transaction {}", name, transactionID);
427
428         final ActorRef self = shard.getSelf();
429         try {
430             cohortEntry.abort();
431
432             shard.getShardMBean().incrementAbortTransactionsCount();
433
434             if(sender != null) {
435                 sender.tell(new AbortTransactionReply().toSerializable(), self);
436             }
437         } catch (Exception e) {
438             log.error("{}: An exception happened during abort", name, e);
439
440             if(sender != null) {
441                 sender.tell(new akka.actor.Status.Failure(e), self);
442             }
443         }
444     }
445
446     /**
447      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
448      * matches the current entry.
449      *
450      * @param transactionID the ID of the transaction
451      * @return the current CohortEntry or null if the given transaction ID does not match the
452      *         current entry.
453      */
454     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
455         if(isCurrentTransaction(transactionID)) {
456             return currentCohortEntry;
457         }
458
459         return null;
460     }
461
462     public CohortEntry getCurrentCohortEntry() {
463         return currentCohortEntry;
464     }
465
466     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
467         return cohortCache.remove(transactionID);
468     }
469
470     public boolean isCurrentTransaction(String transactionID) {
471         return currentCohortEntry != null &&
472                 currentCohortEntry.getTransactionID().equals(transactionID);
473     }
474
475     /**
476      * This method is called when a transaction is complete, successful or not. If the given
477      * given transaction ID matches the current in-progress transaction, the next cohort entry,
478      * if any, is dequeued and processed.
479      *
480      * @param transactionID the ID of the completed transaction
481      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
482      *        the cache.
483      */
484     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
485         if(removeCohortEntry) {
486             cohortCache.remove(transactionID);
487         }
488
489         if(isCurrentTransaction(transactionID)) {
490             currentCohortEntry = null;
491
492             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
493
494             maybeProcessNextCohortEntry();
495         }
496     }
497
498     private void maybeProcessNextCohortEntry() {
499         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
500         // clean out expired entries.
501         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
502         while(iter.hasNext()) {
503             CohortEntry next = iter.next();
504             if(next.isReadyToCommit()) {
505                 if(currentCohortEntry == null) {
506                     if(log.isDebugEnabled()) {
507                         log.debug("{}: Next entry to canCommit {}", name, next);
508                     }
509
510                     iter.remove();
511                     currentCohortEntry = next;
512                     currentCohortEntry.updateLastAccessTime();
513                     doCanCommit(currentCohortEntry);
514                 }
515
516                 break;
517             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
518                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
519                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
520             } else if(!next.isAborted()) {
521                 break;
522             }
523
524             iter.remove();
525             cohortCache.remove(next.getTransactionID());
526         }
527     }
528
529     void cleanupExpiredCohortEntries() {
530         maybeProcessNextCohortEntry();
531     }
532
533     @VisibleForTesting
534     void setCohortDecorator(CohortDecorator cohortDecorator) {
535         this.cohortDecorator = cohortDecorator;
536     }
537
538     static class CohortEntry {
539         private final String transactionID;
540         private ShardDataTreeCohort cohort;
541         private final ReadWriteShardDataTreeTransaction transaction;
542         private RuntimeException lastBatchedModificationsException;
543         private ActorRef replySender;
544         private Shard shard;
545         private boolean doImmediateCommit;
546         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
547         private int totalBatchedModificationsReceived;
548         private boolean aborted;
549
550         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
551             this.transaction = Preconditions.checkNotNull(transaction);
552             this.transactionID = transactionID;
553         }
554
555         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
556             this.transactionID = transactionID;
557             this.cohort = cohort;
558             this.transaction = null;
559         }
560
561         void updateLastAccessTime() {
562             lastAccessTimer.reset();
563             lastAccessTimer.start();
564         }
565
566         String getTransactionID() {
567             return transactionID;
568         }
569
570         DataTreeCandidate getCandidate() {
571             return cohort.getCandidate();
572         }
573
574         int getTotalBatchedModificationsReceived() {
575             return totalBatchedModificationsReceived;
576         }
577
578         RuntimeException getLastBatchedModificationsException() {
579             return lastBatchedModificationsException;
580         }
581
582         void applyModifications(Iterable<Modification> modifications) {
583             totalBatchedModificationsReceived++;
584             if(lastBatchedModificationsException == null) {
585                 for (Modification modification : modifications) {
586                         try {
587                             modification.apply(transaction.getSnapshot());
588                         } catch (RuntimeException e) {
589                             lastBatchedModificationsException = e;
590                             throw e;
591                         }
592                 }
593             }
594         }
595
596         boolean canCommit() throws InterruptedException, ExecutionException {
597             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
598             // about possibly accessing our state on a different thread outside of our dispatcher.
599             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
600             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
601             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
602             return cohort.canCommit().get();
603         }
604
605         void preCommit() throws InterruptedException, ExecutionException {
606             cohort.preCommit().get();
607         }
608
609         void commit() throws InterruptedException, ExecutionException {
610             cohort.commit().get();
611         }
612
613         void abort() throws InterruptedException, ExecutionException {
614             aborted = true;
615             cohort.abort().get();
616         }
617
618         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
619             Preconditions.checkState(cohort == null, "cohort was already set");
620
621             setDoImmediateCommit(doImmediateCommit);
622
623             cohort = transaction.ready();
624
625             if(cohortDecorator != null) {
626                 // Call the hook for unit tests.
627                 cohort = cohortDecorator.decorate(transactionID, cohort);
628             }
629         }
630
631         boolean isReadyToCommit() {
632             return replySender != null;
633         }
634
635         boolean isExpired(long expireTimeInMillis) {
636             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
637         }
638
639         boolean isDoImmediateCommit() {
640             return doImmediateCommit;
641         }
642
643         void setDoImmediateCommit(boolean doImmediateCommit) {
644             this.doImmediateCommit = doImmediateCommit;
645         }
646
647         ActorRef getReplySender() {
648             return replySender;
649         }
650
651         void setReplySender(ActorRef replySender) {
652             this.replySender = replySender;
653         }
654
655         Shard getShard() {
656             return shard;
657         }
658
659         void setShard(Shard shard) {
660             this.shard = shard;
661         }
662
663
664         boolean isAborted() {
665             return aborted;
666         }
667
668         @Override
669         public String toString() {
670             StringBuilder builder = new StringBuilder();
671             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
672                     .append(doImmediateCommit).append("]");
673             return builder.toString();
674         }
675     }
676 }