Remove unused ShardCommitCoordinator#CohortEntry constructor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.Modification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.slf4j.Logger;
35
36 /**
37  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
38  *
39  * @author Thomas Pantelis
40  */
41 class ShardCommitCoordinator {
42
43     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
44     public interface CohortDecorator {
45         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
46     }
47
48     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
49
50     private CohortEntry currentCohortEntry;
51
52     private final ShardDataTree dataTree;
53
54     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
55     // since this should only be accessed on the shard's dispatcher.
56     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
57
58     private int queueCapacity;
59
60     private final Logger log;
61
62     private final String name;
63
64     private final long cacheExpiryTimeoutInMillis;
65
66     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
67     private CohortDecorator cohortDecorator;
68
69     private ReadyTransactionReply readyTransactionReply;
70
71     ShardCommitCoordinator(ShardDataTree dataTree,
72             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
73
74         this.queueCapacity = queueCapacity;
75         this.log = log;
76         this.name = name;
77         this.dataTree = Preconditions.checkNotNull(dataTree);
78         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
79     }
80
81     int getQueueSize() {
82         return queuedCohortEntries.size();
83     }
84
85     void setQueueCapacity(int queueCapacity) {
86         this.queueCapacity = queueCapacity;
87     }
88
89     private ReadyTransactionReply readyTransactionReply(Shard shard) {
90         if(readyTransactionReply == null) {
91             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
92         }
93
94         return readyTransactionReply;
95     }
96
97     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
98         if(queuedCohortEntries.size() < queueCapacity) {
99             queuedCohortEntries.offer(cohortEntry);
100
101             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
102                     queuedCohortEntries.size());
103
104             return true;
105         } else {
106             cohortCache.remove(cohortEntry.getTransactionID());
107
108             RuntimeException ex = new RuntimeException(
109                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
110                                   " capacity %d has been reached.",
111                                   name, cohortEntry.getTransactionID(), queueCapacity));
112             log.error(ex.getMessage());
113             sender.tell(new Status.Failure(ex), shard.self());
114             return false;
115         }
116     }
117
118     /**
119      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
120      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
121      *
122      * @param ready the ForwardedReadyTransaction message to process
123      * @param sender the sender of the message
124      * @param shard the transaction's shard actor
125      */
126     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
127         log.debug("{}: Readying transaction {}, client version {}", name,
128                 ready.getTransactionID(), ready.getTxnClientVersion());
129
130         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort());
131         cohortCache.put(ready.getTransactionID(), cohortEntry);
132
133         if(!queueCohortEntry(cohortEntry, sender, shard)) {
134             return;
135         }
136
137         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
138             // Return our actor path as we'll handle the three phase commit except if the Tx client
139             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
140             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
141             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
142             ActorRef replyActorPath = shard.self();
143             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
144                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
145                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
146                         ready.getTransactionID()));
147             }
148
149             ReadyTransactionReply readyTransactionReply =
150                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
151                             ready.getTxnClientVersion());
152             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
153                 readyTransactionReply, shard.self());
154         } else {
155             if(ready.isDoImmediateCommit()) {
156                 cohortEntry.setDoImmediateCommit(true);
157                 cohortEntry.setReplySender(sender);
158                 cohortEntry.setShard(shard);
159                 handleCanCommit(cohortEntry);
160             } else {
161                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
162                 // front-end so send back a ReadyTransactionReply with our actor path.
163                 sender.tell(readyTransactionReply(shard), shard.self());
164             }
165         }
166     }
167
168     /**
169      * This method handles a BatchedModifications message for a transaction being prepared directly on the
170      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
171      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
172      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
173      *
174      * @param batched the BatchedModifications message to process
175      * @param sender the sender of the message
176      * @param shard the transaction's shard actor
177      */
178     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
179         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
180         if(cohortEntry == null) {
181             cohortEntry = new CohortEntry(batched.getTransactionID(),
182                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
183                         batched.getTransactionChainID()));
184             cohortCache.put(batched.getTransactionID(), cohortEntry);
185         }
186
187         if(log.isDebugEnabled()) {
188             log.debug("{}: Applying {} batched modifications for Tx {}", name,
189                     batched.getModifications().size(), batched.getTransactionID());
190         }
191
192         cohortEntry.applyModifications(batched.getModifications());
193
194         if(batched.isReady()) {
195             if(cohortEntry.getLastBatchedModificationsException() != null) {
196                 cohortCache.remove(cohortEntry.getTransactionID());
197                 throw cohortEntry.getLastBatchedModificationsException();
198             }
199
200             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
201                 cohortCache.remove(cohortEntry.getTransactionID());
202                 throw new IllegalStateException(String.format(
203                         "The total number of batched messages received %d does not match the number sent %d",
204                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
205             }
206
207             if(!queueCohortEntry(cohortEntry, sender, shard)) {
208                 return;
209             }
210
211             if(log.isDebugEnabled()) {
212                 log.debug("{}: Readying Tx {}, client version {}", name,
213                         batched.getTransactionID(), batched.getVersion());
214             }
215
216             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
217
218             if(batched.isDoCommitOnReady()) {
219                 cohortEntry.setReplySender(sender);
220                 cohortEntry.setShard(shard);
221                 handleCanCommit(cohortEntry);
222             } else {
223                 sender.tell(readyTransactionReply(shard), shard.self());
224             }
225         } else {
226             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
227         }
228     }
229
230     /**
231      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
232      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
233      *
234      * @param message the ReadyLocalTransaction message to process
235      * @param sender the sender of the message
236      * @param shard the transaction's shard actor
237      */
238     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
239         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
240                 message.getTransactionID());
241         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
242         cohortCache.put(message.getTransactionID(), cohortEntry);
243         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
244
245         if(!queueCohortEntry(cohortEntry, sender, shard)) {
246             return;
247         }
248
249         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
250
251         if (message.isDoCommitOnReady()) {
252             cohortEntry.setReplySender(sender);
253             cohortEntry.setShard(shard);
254             handleCanCommit(cohortEntry);
255         } else {
256             sender.tell(readyTransactionReply(shard), shard.self());
257         }
258     }
259
260     private void handleCanCommit(CohortEntry cohortEntry) {
261         String transactionID = cohortEntry.getTransactionID();
262
263         cohortEntry.updateLastAccessTime();
264
265         if(currentCohortEntry != null) {
266             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
267             // queue and will get processed after all prior entries complete.
268
269             if(log.isDebugEnabled()) {
270                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
271                         name, currentCohortEntry.getTransactionID(), transactionID);
272             }
273
274             return;
275         }
276
277         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
278         // it the current entry and proceed with canCommit.
279         // Purposely checking reference equality here.
280         if(queuedCohortEntries.peek() == cohortEntry) {
281             currentCohortEntry = queuedCohortEntries.poll();
282             doCanCommit(currentCohortEntry);
283         } else {
284             if(log.isDebugEnabled()) {
285                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
286                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
287             }
288         }
289     }
290
291     /**
292      * This method handles the canCommit phase for a transaction.
293      *
294      * @param transactionID the ID of the transaction to canCommit
295      * @param sender the actor to which to send the response
296      * @param shard the transaction's shard actor
297      */
298     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
299         // Lookup the cohort entry that was cached previously (or should have been) by
300         // transactionReady (via the ForwardedReadyTransaction message).
301         final CohortEntry cohortEntry = cohortCache.get(transactionID);
302         if(cohortEntry == null) {
303             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
304             // between canCommit and ready and the entry was expired from the cache.
305             IllegalStateException ex = new IllegalStateException(
306                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
307             log.error(ex.getMessage());
308             sender.tell(new Status.Failure(ex), shard.self());
309             return;
310         }
311
312         cohortEntry.setReplySender(sender);
313         cohortEntry.setShard(shard);
314
315         handleCanCommit(cohortEntry);
316     }
317
318     private void doCanCommit(final CohortEntry cohortEntry) {
319         boolean canCommit = false;
320         try {
321             canCommit = cohortEntry.canCommit();
322
323             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
324
325             if(cohortEntry.isDoImmediateCommit()) {
326                 if(canCommit) {
327                     doCommit(cohortEntry);
328                 } else {
329                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
330                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
331                 }
332             } else {
333                 cohortEntry.getReplySender().tell(
334                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
335                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
336             }
337         } catch (Exception e) {
338             log.debug("{}: An exception occurred during canCommit", name, e);
339
340             Throwable failure = e;
341             if(e instanceof ExecutionException) {
342                 failure = e.getCause();
343             }
344
345             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
346         } finally {
347             if(!canCommit) {
348                 // Remove the entry from the cache now.
349                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
350             }
351         }
352     }
353
354     private boolean doCommit(CohortEntry cohortEntry) {
355         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
356
357         boolean success = false;
358
359         // We perform the preCommit phase here atomically with the commit phase. This is an
360         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
361         // coordination of preCommit across shards in case of failure but preCommit should not
362         // normally fail since we ensure only one concurrent 3-phase commit.
363
364         try {
365             cohortEntry.preCommit();
366
367             cohortEntry.getShard().continueCommit(cohortEntry);
368
369             cohortEntry.updateLastAccessTime();
370
371             success = true;
372         } catch (Exception e) {
373             log.error("{} An exception occurred while preCommitting transaction {}",
374                     name, cohortEntry.getTransactionID(), e);
375             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
376
377             currentTransactionComplete(cohortEntry.getTransactionID(), true);
378         }
379
380         return success;
381     }
382
383     /**
384      * This method handles the preCommit and commit phases for a transaction.
385      *
386      * @param transactionID the ID of the transaction to commit
387      * @param sender the actor to which to send the response
388      * @param shard the transaction's shard actor
389      * @return true if the transaction was successfully prepared, false otherwise.
390      */
391     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
392         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
393         // this transaction.
394         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
395         if(cohortEntry == null) {
396             // We're not the current Tx - the Tx was likely expired b/c it took too long in
397             // between the canCommit and commit messages.
398             IllegalStateException ex = new IllegalStateException(
399                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
400                             name, transactionID));
401             log.error(ex.getMessage());
402             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
403             return false;
404         }
405
406         cohortEntry.setReplySender(sender);
407         return doCommit(cohortEntry);
408     }
409
410     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
411         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
412         if(cohortEntry != null) {
413             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
414             // aborted during replication in which case we may still commit locally if replication
415             // succeeds.
416             currentTransactionComplete(transactionID, false);
417         } else {
418             cohortEntry = getAndRemoveCohortEntry(transactionID);
419         }
420
421         if(cohortEntry == null) {
422             return;
423         }
424
425         log.debug("{}: Aborting transaction {}", name, transactionID);
426
427         final ActorRef self = shard.getSelf();
428         try {
429             cohortEntry.abort();
430
431             shard.getShardMBean().incrementAbortTransactionsCount();
432
433             if(sender != null) {
434                 sender.tell(new AbortTransactionReply().toSerializable(), self);
435             }
436         } catch (Exception e) {
437             log.error("{}: An exception happened during abort", name, e);
438
439             if(sender != null) {
440                 sender.tell(new akka.actor.Status.Failure(e), self);
441             }
442         }
443     }
444
445     /**
446      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
447      * matches the current entry.
448      *
449      * @param transactionID the ID of the transaction
450      * @return the current CohortEntry or null if the given transaction ID does not match the
451      *         current entry.
452      */
453     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
454         if(isCurrentTransaction(transactionID)) {
455             return currentCohortEntry;
456         }
457
458         return null;
459     }
460
461     public CohortEntry getCurrentCohortEntry() {
462         return currentCohortEntry;
463     }
464
465     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
466         return cohortCache.remove(transactionID);
467     }
468
469     public boolean isCurrentTransaction(String transactionID) {
470         return currentCohortEntry != null &&
471                 currentCohortEntry.getTransactionID().equals(transactionID);
472     }
473
474     /**
475      * This method is called when a transaction is complete, successful or not. If the given
476      * given transaction ID matches the current in-progress transaction, the next cohort entry,
477      * if any, is dequeued and processed.
478      *
479      * @param transactionID the ID of the completed transaction
480      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
481      *        the cache.
482      */
483     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
484         if(removeCohortEntry) {
485             cohortCache.remove(transactionID);
486         }
487
488         if(isCurrentTransaction(transactionID)) {
489             currentCohortEntry = null;
490
491             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
492
493             maybeProcessNextCohortEntry();
494         }
495     }
496
497     private void maybeProcessNextCohortEntry() {
498         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
499         // clean out expired entries.
500         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
501         while(iter.hasNext()) {
502             CohortEntry next = iter.next();
503             if(next.isReadyToCommit()) {
504                 if(currentCohortEntry == null) {
505                     if(log.isDebugEnabled()) {
506                         log.debug("{}: Next entry to canCommit {}", name, next);
507                     }
508
509                     iter.remove();
510                     currentCohortEntry = next;
511                     currentCohortEntry.updateLastAccessTime();
512                     doCanCommit(currentCohortEntry);
513                 }
514
515                 break;
516             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
517                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
518                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
519             } else if(!next.isAborted()) {
520                 break;
521             }
522
523             iter.remove();
524             cohortCache.remove(next.getTransactionID());
525         }
526     }
527
528     void cleanupExpiredCohortEntries() {
529         maybeProcessNextCohortEntry();
530     }
531
532     @VisibleForTesting
533     void setCohortDecorator(CohortDecorator cohortDecorator) {
534         this.cohortDecorator = cohortDecorator;
535     }
536
537     static class CohortEntry {
538         private final String transactionID;
539         private ShardDataTreeCohort cohort;
540         private final ReadWriteShardDataTreeTransaction transaction;
541         private RuntimeException lastBatchedModificationsException;
542         private ActorRef replySender;
543         private Shard shard;
544         private boolean doImmediateCommit;
545         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
546         private int totalBatchedModificationsReceived;
547         private boolean aborted;
548
549         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
550             this.transaction = Preconditions.checkNotNull(transaction);
551             this.transactionID = transactionID;
552         }
553
554         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
555             this.transactionID = transactionID;
556             this.cohort = cohort;
557             this.transaction = null;
558         }
559
560         void updateLastAccessTime() {
561             lastAccessTimer.reset();
562             lastAccessTimer.start();
563         }
564
565         String getTransactionID() {
566             return transactionID;
567         }
568
569         DataTreeCandidate getCandidate() {
570             return cohort.getCandidate();
571         }
572
573         int getTotalBatchedModificationsReceived() {
574             return totalBatchedModificationsReceived;
575         }
576
577         RuntimeException getLastBatchedModificationsException() {
578             return lastBatchedModificationsException;
579         }
580
581         void applyModifications(Iterable<Modification> modifications) {
582             totalBatchedModificationsReceived++;
583             if(lastBatchedModificationsException == null) {
584                 for (Modification modification : modifications) {
585                         try {
586                             modification.apply(transaction.getSnapshot());
587                         } catch (RuntimeException e) {
588                             lastBatchedModificationsException = e;
589                             throw e;
590                         }
591                 }
592             }
593         }
594
595         boolean canCommit() throws InterruptedException, ExecutionException {
596             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
597             // about possibly accessing our state on a different thread outside of our dispatcher.
598             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
599             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
600             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
601             return cohort.canCommit().get();
602         }
603
604         void preCommit() throws InterruptedException, ExecutionException {
605             cohort.preCommit().get();
606         }
607
608         void commit() throws InterruptedException, ExecutionException {
609             cohort.commit().get();
610         }
611
612         void abort() throws InterruptedException, ExecutionException {
613             aborted = true;
614             cohort.abort().get();
615         }
616
617         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
618             Preconditions.checkState(cohort == null, "cohort was already set");
619
620             setDoImmediateCommit(doImmediateCommit);
621
622             cohort = transaction.ready();
623
624             if(cohortDecorator != null) {
625                 // Call the hook for unit tests.
626                 cohort = cohortDecorator.decorate(transactionID, cohort);
627             }
628         }
629
630         boolean isReadyToCommit() {
631             return replySender != null;
632         }
633
634         boolean isExpired(long expireTimeInMillis) {
635             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
636         }
637
638         boolean isDoImmediateCommit() {
639             return doImmediateCommit;
640         }
641
642         void setDoImmediateCommit(boolean doImmediateCommit) {
643             this.doImmediateCommit = doImmediateCommit;
644         }
645
646         ActorRef getReplySender() {
647             return replySender;
648         }
649
650         void setReplySender(ActorRef replySender) {
651             this.replySender = replySender;
652         }
653
654         Shard getShard() {
655             return shard;
656         }
657
658         void setShard(Shard shard) {
659             this.shard = shard;
660         }
661
662
663         boolean isAborted() {
664             return aborted;
665         }
666
667         @Override
668         public String toString() {
669             StringBuilder builder = new StringBuilder();
670             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
671                     .append(doImmediateCommit).append("]");
672             return builder.toString();
673         }
674     }
675 }