Fix forwarding in Shard.handleBatchedModifications
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Queue;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.modification.Modification;
35 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
38 import org.slf4j.Logger;
39
40 /**
41  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
42  *
43  * @author Thomas Pantelis
44  */
45 class ShardCommitCoordinator {
46
47     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
48     public interface CohortDecorator {
49         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
50     }
51
52     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
53
54     private CohortEntry currentCohortEntry;
55
56     private final ShardDataTree dataTree;
57
58     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
59     // since this should only be accessed on the shard's dispatcher.
60     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
61
62     private int queueCapacity;
63
64     private final Logger log;
65
66     private final String name;
67
68     private final long cacheExpiryTimeoutInMillis;
69
70     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
71     private CohortDecorator cohortDecorator;
72
73     private ReadyTransactionReply readyTransactionReply;
74
75     private Runnable runOnPendingTransactionsComplete;
76
77     ShardCommitCoordinator(ShardDataTree dataTree,
78             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
79
80         this.queueCapacity = queueCapacity;
81         this.log = log;
82         this.name = name;
83         this.dataTree = Preconditions.checkNotNull(dataTree);
84         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
85     }
86
87     int getQueueSize() {
88         return queuedCohortEntries.size();
89     }
90
91     int getCohortCacheSize() {
92         return cohortCache.size();
93     }
94
95     void setQueueCapacity(int queueCapacity) {
96         this.queueCapacity = queueCapacity;
97     }
98
99     private ReadyTransactionReply readyTransactionReply(Shard shard) {
100         if(readyTransactionReply == null) {
101             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
102         }
103
104         return readyTransactionReply;
105     }
106
107     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
108         if(queuedCohortEntries.size() < queueCapacity) {
109             queuedCohortEntries.offer(cohortEntry);
110
111             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
112                     queuedCohortEntries.size());
113
114             return true;
115         } else {
116             cohortCache.remove(cohortEntry.getTransactionID());
117
118             RuntimeException ex = new RuntimeException(
119                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
120                                   " capacity %d has been reached.",
121                                   name, cohortEntry.getTransactionID(), queueCapacity));
122             log.error(ex.getMessage());
123             sender.tell(new Failure(ex), shard.self());
124             return false;
125         }
126     }
127
128     /**
129      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
130      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
131      *
132      * @param ready the ForwardedReadyTransaction message to process
133      * @param sender the sender of the message
134      * @param shard the transaction's shard actor
135      */
136     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
137         log.debug("{}: Readying transaction {}, client version {}", name,
138                 ready.getTransactionID(), ready.getTxnClientVersion());
139
140         ShardDataTreeCohort cohort = ready.getTransaction().ready();
141         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
142         cohortCache.put(ready.getTransactionID(), cohortEntry);
143
144         if(!queueCohortEntry(cohortEntry, sender, shard)) {
145             return;
146         }
147
148         if(ready.isDoImmediateCommit()) {
149             cohortEntry.setDoImmediateCommit(true);
150             cohortEntry.setReplySender(sender);
151             cohortEntry.setShard(shard);
152             handleCanCommit(cohortEntry);
153         } else {
154             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
155             // front-end so send back a ReadyTransactionReply with our actor path.
156             sender.tell(readyTransactionReply(shard), shard.self());
157         }
158     }
159
160     /**
161      * This method handles a BatchedModifications message for a transaction being prepared directly on the
162      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
163      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
164      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
165      *
166      * @param batched the BatchedModifications message to process
167      * @param sender the sender of the message
168      * @param shard the transaction's shard actor
169      */
170     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
171         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
172         if(cohortEntry == null) {
173             cohortEntry = new CohortEntry(batched.getTransactionID(),
174                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
175                         batched.getTransactionChainID()), batched.getVersion());
176             cohortCache.put(batched.getTransactionID(), cohortEntry);
177         }
178
179         if(log.isDebugEnabled()) {
180             log.debug("{}: Applying {} batched modifications for Tx {}", name,
181                     batched.getModifications().size(), batched.getTransactionID());
182         }
183
184         cohortEntry.applyModifications(batched.getModifications());
185
186         if(batched.isReady()) {
187             if(cohortEntry.getLastBatchedModificationsException() != null) {
188                 cohortCache.remove(cohortEntry.getTransactionID());
189                 throw cohortEntry.getLastBatchedModificationsException();
190             }
191
192             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
193                 cohortCache.remove(cohortEntry.getTransactionID());
194                 throw new IllegalStateException(String.format(
195                         "The total number of batched messages received %d does not match the number sent %d",
196                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
197             }
198
199             if(!queueCohortEntry(cohortEntry, sender, shard)) {
200                 return;
201             }
202
203             if(log.isDebugEnabled()) {
204                 log.debug("{}: Readying Tx {}, client version {}", name,
205                         batched.getTransactionID(), batched.getVersion());
206             }
207
208             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
209
210             if(batched.isDoCommitOnReady()) {
211                 cohortEntry.setReplySender(sender);
212                 cohortEntry.setShard(shard);
213                 handleCanCommit(cohortEntry);
214             } else {
215                 sender.tell(readyTransactionReply(shard), shard.self());
216             }
217         } else {
218             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
219         }
220     }
221
222     /**
223      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
224      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
225      *
226      * @param message the ReadyLocalTransaction message to process
227      * @param sender the sender of the message
228      * @param shard the transaction's shard actor
229      */
230     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
231         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
232                 message.getTransactionID());
233         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
234                 DataStoreVersions.CURRENT_VERSION);
235         cohortCache.put(message.getTransactionID(), cohortEntry);
236         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
237
238         if(!queueCohortEntry(cohortEntry, sender, shard)) {
239             return;
240         }
241
242         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
243
244         if (message.isDoCommitOnReady()) {
245             cohortEntry.setReplySender(sender);
246             cohortEntry.setShard(shard);
247             handleCanCommit(cohortEntry);
248         } else {
249             sender.tell(readyTransactionReply(shard), shard.self());
250         }
251     }
252
253     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
254             final int maxModificationsPerBatch) {
255         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
256         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
257             return Collections.singletonList(from);
258         }
259
260         cohortEntry.applyModifications(from.getModifications());
261
262         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
263         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
264             @Override
265             protected BatchedModifications getModifications() {
266                 if(newModifications.isEmpty() ||
267                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
268                     newModifications.add(new BatchedModifications(from.getTransactionID(),
269                             from.getVersion(), from.getTransactionChainID()));
270                 }
271
272                 return newModifications.getLast();
273             }
274         });
275
276         BatchedModifications last = newModifications.getLast();
277         last.setDoCommitOnReady(from.isDoCommitOnReady());
278         last.setReady(from.isReady());
279         last.setTotalMessagesSent(newModifications.size());
280         return newModifications;
281     }
282
283     private void handleCanCommit(CohortEntry cohortEntry) {
284         String transactionID = cohortEntry.getTransactionID();
285
286         cohortEntry.updateLastAccessTime();
287
288         if(currentCohortEntry != null) {
289             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
290             // queue and will get processed after all prior entries complete.
291
292             if(log.isDebugEnabled()) {
293                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
294                         name, currentCohortEntry.getTransactionID(), transactionID);
295             }
296
297             return;
298         }
299
300         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
301         // it the current entry and proceed with canCommit.
302         // Purposely checking reference equality here.
303         if(queuedCohortEntries.peek() == cohortEntry) {
304             currentCohortEntry = queuedCohortEntries.poll();
305             doCanCommit(currentCohortEntry);
306         } else {
307             if(log.isDebugEnabled()) {
308                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
309                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
310             }
311         }
312     }
313
314     /**
315      * This method handles the canCommit phase for a transaction.
316      *
317      * @param transactionID the ID of the transaction to canCommit
318      * @param sender the actor to which to send the response
319      * @param shard the transaction's shard actor
320      */
321     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
322         // Lookup the cohort entry that was cached previously (or should have been) by
323         // transactionReady (via the ForwardedReadyTransaction message).
324         final CohortEntry cohortEntry = cohortCache.get(transactionID);
325         if(cohortEntry == null) {
326             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
327             // between canCommit and ready and the entry was expired from the cache.
328             IllegalStateException ex = new IllegalStateException(
329                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
330             log.error(ex.getMessage());
331             sender.tell(new Failure(ex), shard.self());
332             return;
333         }
334
335         cohortEntry.setReplySender(sender);
336         cohortEntry.setShard(shard);
337
338         handleCanCommit(cohortEntry);
339     }
340
341     private void doCanCommit(final CohortEntry cohortEntry) {
342         boolean canCommit = false;
343         try {
344             canCommit = cohortEntry.canCommit();
345
346             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
347
348             if(cohortEntry.isDoImmediateCommit()) {
349                 if(canCommit) {
350                     doCommit(cohortEntry);
351                 } else {
352                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
353                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
354                 }
355             } else {
356                 // FIXME - use caller's version
357                 cohortEntry.getReplySender().tell(
358                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
359                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
360                         cohortEntry.getShard().self());
361             }
362         } catch (Exception e) {
363             log.debug("{}: An exception occurred during canCommit", name, e);
364
365             Throwable failure = e;
366             if(e instanceof ExecutionException) {
367                 failure = e.getCause();
368             }
369
370             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
371         } finally {
372             if(!canCommit) {
373                 // Remove the entry from the cache now.
374                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
375             }
376         }
377     }
378
379     private boolean doCommit(CohortEntry cohortEntry) {
380         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
381
382         boolean success = false;
383
384         // We perform the preCommit phase here atomically with the commit phase. This is an
385         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
386         // coordination of preCommit across shards in case of failure but preCommit should not
387         // normally fail since we ensure only one concurrent 3-phase commit.
388
389         try {
390             cohortEntry.preCommit();
391
392             cohortEntry.getShard().continueCommit(cohortEntry);
393
394             cohortEntry.updateLastAccessTime();
395
396             success = true;
397         } catch (Exception e) {
398             log.error("{} An exception occurred while preCommitting transaction {}",
399                     name, cohortEntry.getTransactionID(), e);
400             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
401
402             currentTransactionComplete(cohortEntry.getTransactionID(), true);
403         }
404
405         return success;
406     }
407
408     /**
409      * This method handles the preCommit and commit phases for a transaction.
410      *
411      * @param transactionID the ID of the transaction to commit
412      * @param sender the actor to which to send the response
413      * @param shard the transaction's shard actor
414      * @return true if the transaction was successfully prepared, false otherwise.
415      */
416     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
417         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
418         // this transaction.
419         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
420         if(cohortEntry == null) {
421             // We're not the current Tx - the Tx was likely expired b/c it took too long in
422             // between the canCommit and commit messages.
423             IllegalStateException ex = new IllegalStateException(
424                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
425                             name, transactionID));
426             log.error(ex.getMessage());
427             sender.tell(new Failure(ex), shard.self());
428             return false;
429         }
430
431         cohortEntry.setReplySender(sender);
432         return doCommit(cohortEntry);
433     }
434
435     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
436         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
437         if(cohortEntry != null) {
438             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
439             // aborted during replication in which case we may still commit locally if replication
440             // succeeds.
441             currentTransactionComplete(transactionID, false);
442         } else {
443             cohortEntry = getAndRemoveCohortEntry(transactionID);
444         }
445
446         if(cohortEntry == null) {
447             return;
448         }
449
450         log.debug("{}: Aborting transaction {}", name, transactionID);
451
452         final ActorRef self = shard.getSelf();
453         try {
454             cohortEntry.abort();
455
456             shard.getShardMBean().incrementAbortTransactionsCount();
457
458             if(sender != null) {
459                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
460             }
461         } catch (Exception e) {
462             log.error("{}: An exception happened during abort", name, e);
463
464             if(sender != null) {
465                 sender.tell(new Failure(e), self);
466             }
467         }
468     }
469
470     void checkForExpiredTransactions(final long timeout, final Shard shard) {
471         CohortEntry cohortEntry = getCurrentCohortEntry();
472         if(cohortEntry != null) {
473             if(cohortEntry.isExpired(timeout)) {
474                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
475                         name, cohortEntry.getTransactionID(), timeout);
476
477                 handleAbort(cohortEntry.getTransactionID(), null, shard);
478             }
479         }
480
481         cleanupExpiredCohortEntries();
482     }
483
484     void abortPendingTransactions(final String reason, final Shard shard) {
485         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
486             return;
487         }
488
489         List<CohortEntry> cohortEntries = new ArrayList<>();
490
491         if(currentCohortEntry != null) {
492             cohortEntries.add(currentCohortEntry);
493             currentCohortEntry = null;
494         }
495
496         cohortEntries.addAll(queuedCohortEntries);
497         queuedCohortEntries.clear();
498
499         for(CohortEntry cohortEntry: cohortEntries) {
500             if(cohortEntry.getReplySender() != null) {
501                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
502             }
503         }
504     }
505
506     /**
507      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
508      * matches the current entry.
509      *
510      * @param transactionID the ID of the transaction
511      * @return the current CohortEntry or null if the given transaction ID does not match the
512      *         current entry.
513      */
514     CohortEntry getCohortEntryIfCurrent(String transactionID) {
515         if(isCurrentTransaction(transactionID)) {
516             return currentCohortEntry;
517         }
518
519         return null;
520     }
521
522     CohortEntry getCurrentCohortEntry() {
523         return currentCohortEntry;
524     }
525
526     CohortEntry getAndRemoveCohortEntry(String transactionID) {
527         return cohortCache.remove(transactionID);
528     }
529
530     boolean isCurrentTransaction(String transactionID) {
531         return currentCohortEntry != null &&
532                 currentCohortEntry.getTransactionID().equals(transactionID);
533     }
534
535     /**
536      * This method is called when a transaction is complete, successful or not. If the given
537      * given transaction ID matches the current in-progress transaction, the next cohort entry,
538      * if any, is dequeued and processed.
539      *
540      * @param transactionID the ID of the completed transaction
541      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
542      *        the cache.
543      */
544     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
545         if(removeCohortEntry) {
546             cohortCache.remove(transactionID);
547         }
548
549         if(isCurrentTransaction(transactionID)) {
550             currentCohortEntry = null;
551
552             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
553
554             maybeProcessNextCohortEntry();
555         }
556     }
557
558     private void maybeProcessNextCohortEntry() {
559         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
560         // clean out expired entries.
561         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
562         while(iter.hasNext()) {
563             CohortEntry next = iter.next();
564             if(next.isReadyToCommit()) {
565                 if(currentCohortEntry == null) {
566                     if(log.isDebugEnabled()) {
567                         log.debug("{}: Next entry to canCommit {}", name, next);
568                     }
569
570                     iter.remove();
571                     currentCohortEntry = next;
572                     currentCohortEntry.updateLastAccessTime();
573                     doCanCommit(currentCohortEntry);
574                 }
575
576                 break;
577             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
578                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
579                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
580             } else if(!next.isAborted()) {
581                 break;
582             }
583
584             iter.remove();
585             cohortCache.remove(next.getTransactionID());
586         }
587
588         maybeRunOperationOnPendingTransactionsComplete();
589     }
590
591     void cleanupExpiredCohortEntries() {
592         maybeProcessNextCohortEntry();
593     }
594
595     void setRunOnPendingTransactionsComplete(Runnable operation) {
596         runOnPendingTransactionsComplete = operation;
597         maybeRunOperationOnPendingTransactionsComplete();
598     }
599
600     private void maybeRunOperationOnPendingTransactionsComplete() {
601         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
602             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
603
604             runOnPendingTransactionsComplete.run();
605             runOnPendingTransactionsComplete = null;
606         }
607     }
608
609     @VisibleForTesting
610     void setCohortDecorator(CohortDecorator cohortDecorator) {
611         this.cohortDecorator = cohortDecorator;
612     }
613
614     static class CohortEntry {
615         private final String transactionID;
616         private ShardDataTreeCohort cohort;
617         private final ReadWriteShardDataTreeTransaction transaction;
618         private RuntimeException lastBatchedModificationsException;
619         private ActorRef replySender;
620         private Shard shard;
621         private boolean doImmediateCommit;
622         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
623         private int totalBatchedModificationsReceived;
624         private boolean aborted;
625         private final short clientVersion;
626
627         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
628             this.transaction = Preconditions.checkNotNull(transaction);
629             this.transactionID = transactionID;
630             this.clientVersion = clientVersion;
631         }
632
633         CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
634             this.transactionID = transactionID;
635             this.cohort = cohort;
636             this.transaction = null;
637             this.clientVersion = clientVersion;
638         }
639
640         void updateLastAccessTime() {
641             lastAccessTimer.reset();
642             lastAccessTimer.start();
643         }
644
645         String getTransactionID() {
646             return transactionID;
647         }
648
649         short getClientVersion() {
650             return clientVersion;
651         }
652
653         DataTreeCandidate getCandidate() {
654             return cohort.getCandidate();
655         }
656
657         ReadWriteShardDataTreeTransaction getTransaction() {
658             return transaction;
659         }
660
661         int getTotalBatchedModificationsReceived() {
662             return totalBatchedModificationsReceived;
663         }
664
665         RuntimeException getLastBatchedModificationsException() {
666             return lastBatchedModificationsException;
667         }
668
669         void applyModifications(Iterable<Modification> modifications) {
670             totalBatchedModificationsReceived++;
671             if(lastBatchedModificationsException == null) {
672                 for (Modification modification : modifications) {
673                         try {
674                             modification.apply(transaction.getSnapshot());
675                         } catch (RuntimeException e) {
676                             lastBatchedModificationsException = e;
677                             throw e;
678                         }
679                 }
680             }
681         }
682
683         boolean canCommit() throws InterruptedException, ExecutionException {
684             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
685             // about possibly accessing our state on a different thread outside of our dispatcher.
686             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
687             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
688             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
689             return cohort.canCommit().get();
690         }
691
692         void preCommit() throws InterruptedException, ExecutionException {
693             cohort.preCommit().get();
694         }
695
696         void commit() throws InterruptedException, ExecutionException {
697             cohort.commit().get();
698         }
699
700         void abort() throws InterruptedException, ExecutionException {
701             aborted = true;
702             cohort.abort().get();
703         }
704
705         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
706             Preconditions.checkState(cohort == null, "cohort was already set");
707
708             setDoImmediateCommit(doImmediateCommit);
709
710             cohort = transaction.ready();
711
712             if(cohortDecorator != null) {
713                 // Call the hook for unit tests.
714                 cohort = cohortDecorator.decorate(transactionID, cohort);
715             }
716         }
717
718         boolean isReadyToCommit() {
719             return replySender != null;
720         }
721
722         boolean isExpired(long expireTimeInMillis) {
723             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
724         }
725
726         boolean isDoImmediateCommit() {
727             return doImmediateCommit;
728         }
729
730         void setDoImmediateCommit(boolean doImmediateCommit) {
731             this.doImmediateCommit = doImmediateCommit;
732         }
733
734         ActorRef getReplySender() {
735             return replySender;
736         }
737
738         void setReplySender(ActorRef replySender) {
739             this.replySender = replySender;
740         }
741
742         Shard getShard() {
743             return shard;
744         }
745
746         void setShard(Shard shard) {
747             this.shard = shard;
748         }
749
750
751         boolean isAborted() {
752             return aborted;
753         }
754
755         @Override
756         public String toString() {
757             StringBuilder builder = new StringBuilder();
758             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
759                     .append(doImmediateCommit).append("]");
760             return builder.toString();
761         }
762     }
763 }