BUG 5656 : Entity ownership candidates not removed consistently on leadership change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Queue;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.modification.Modification;
38 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
39 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
42 import org.slf4j.Logger;
43
44 /**
45  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
46  *
47  * @author Thomas Pantelis
48  */
49 class ShardCommitCoordinator {
50
51     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
52     public interface CohortDecorator {
53         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
54     }
55
56     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
57
58     private CohortEntry currentCohortEntry;
59
60     private final ShardDataTree dataTree;
61
62     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
63     // since this should only be accessed on the shard's dispatcher.
64     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
65
66     private int queueCapacity;
67
68     private final Logger log;
69
70     private final String name;
71
72     private final long cacheExpiryTimeoutInMillis;
73
74     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
75     private CohortDecorator cohortDecorator;
76
77     private ReadyTransactionReply readyTransactionReply;
78
79     private Runnable runOnPendingTransactionsComplete;
80
81     ShardCommitCoordinator(ShardDataTree dataTree,
82             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
83
84         this.queueCapacity = queueCapacity;
85         this.log = log;
86         this.name = name;
87         this.dataTree = Preconditions.checkNotNull(dataTree);
88         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
89     }
90
91     int getQueueSize() {
92         return queuedCohortEntries.size();
93     }
94
95     int getCohortCacheSize() {
96         return cohortCache.size();
97     }
98
99     void setQueueCapacity(int queueCapacity) {
100         this.queueCapacity = queueCapacity;
101     }
102
103     private ReadyTransactionReply readyTransactionReply(Shard shard) {
104         if(readyTransactionReply == null) {
105             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
106         }
107
108         return readyTransactionReply;
109     }
110
111     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
112         if(queuedCohortEntries.size() < queueCapacity) {
113             queuedCohortEntries.offer(cohortEntry);
114
115             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
116                     queuedCohortEntries.size());
117
118             return true;
119         } else {
120             cohortCache.remove(cohortEntry.getTransactionID());
121
122             RuntimeException ex = new RuntimeException(
123                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
124                                   " capacity %d has been reached.",
125                                   name, cohortEntry.getTransactionID(), queueCapacity));
126             log.error(ex.getMessage());
127             sender.tell(new Failure(ex), shard.self());
128             return false;
129         }
130     }
131
132     /**
133      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
134      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
135      *
136      * @param ready the ForwardedReadyTransaction message to process
137      * @param sender the sender of the message
138      * @param shard the transaction's shard actor
139      */
140     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
141         log.debug("{}: Readying transaction {}, client version {}", name,
142                 ready.getTransactionID(), ready.getTxnClientVersion());
143
144         ShardDataTreeCohort cohort = ready.getTransaction().ready();
145         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
146         cohortCache.put(ready.getTransactionID(), cohortEntry);
147
148         if(!queueCohortEntry(cohortEntry, sender, shard)) {
149             return;
150         }
151
152         if(ready.isDoImmediateCommit()) {
153             cohortEntry.setDoImmediateCommit(true);
154             cohortEntry.setReplySender(sender);
155             cohortEntry.setShard(shard);
156             handleCanCommit(cohortEntry);
157         } else {
158             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
159             // front-end so send back a ReadyTransactionReply with our actor path.
160             sender.tell(readyTransactionReply(shard), shard.self());
161         }
162     }
163
164     /**
165      * This method handles a BatchedModifications message for a transaction being prepared directly on the
166      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
167      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
168      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
169      *
170      * @param batched the BatchedModifications message to process
171      * @param sender the sender of the message
172      * @param shard the transaction's shard actor
173      */
174     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
175         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
176         if(cohortEntry == null) {
177             cohortEntry = new CohortEntry(batched.getTransactionID(),
178                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
179                         batched.getTransactionChainID()), batched.getVersion());
180             cohortCache.put(batched.getTransactionID(), cohortEntry);
181         }
182
183         if(log.isDebugEnabled()) {
184             log.debug("{}: Applying {} batched modifications for Tx {}", name,
185                     batched.getModifications().size(), batched.getTransactionID());
186         }
187
188         cohortEntry.applyModifications(batched.getModifications());
189
190         if(batched.isReady()) {
191             if(cohortEntry.getLastBatchedModificationsException() != null) {
192                 cohortCache.remove(cohortEntry.getTransactionID());
193                 throw cohortEntry.getLastBatchedModificationsException();
194             }
195
196             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
197                 cohortCache.remove(cohortEntry.getTransactionID());
198                 throw new IllegalStateException(String.format(
199                         "The total number of batched messages received %d does not match the number sent %d",
200                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
201             }
202
203             if(!queueCohortEntry(cohortEntry, sender, shard)) {
204                 return;
205             }
206
207             if(log.isDebugEnabled()) {
208                 log.debug("{}: Readying Tx {}, client version {}", name,
209                         batched.getTransactionID(), batched.getVersion());
210             }
211
212             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
213
214             if(batched.isDoCommitOnReady()) {
215                 cohortEntry.setReplySender(sender);
216                 cohortEntry.setShard(shard);
217                 handleCanCommit(cohortEntry);
218             } else {
219                 sender.tell(readyTransactionReply(shard), shard.self());
220             }
221         } else {
222             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
223         }
224     }
225
226     /**
227      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
228      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
229      *
230      * @param message the ReadyLocalTransaction message to process
231      * @param sender the sender of the message
232      * @param shard the transaction's shard actor
233      */
234     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
235         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
236                 message.getTransactionID());
237         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
238                 DataStoreVersions.CURRENT_VERSION);
239         cohortCache.put(message.getTransactionID(), cohortEntry);
240         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
241
242         if(!queueCohortEntry(cohortEntry, sender, shard)) {
243             return;
244         }
245
246         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
247
248         if (message.isDoCommitOnReady()) {
249             cohortEntry.setReplySender(sender);
250             cohortEntry.setShard(shard);
251             handleCanCommit(cohortEntry);
252         } else {
253             sender.tell(readyTransactionReply(shard), shard.self());
254         }
255     }
256
257     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
258             final int maxModificationsPerBatch) {
259         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
260         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
261             return Collections.singletonList(from);
262         }
263
264         cohortEntry.applyModifications(from.getModifications());
265
266         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
267         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
268             @Override
269             protected BatchedModifications getModifications() {
270                 if(newModifications.isEmpty() ||
271                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
272                     newModifications.add(new BatchedModifications(from.getTransactionID(),
273                             from.getVersion(), from.getTransactionChainID()));
274                 }
275
276                 return newModifications.getLast();
277             }
278         });
279
280         BatchedModifications last = newModifications.getLast();
281         last.setDoCommitOnReady(from.isDoCommitOnReady());
282         last.setReady(from.isReady());
283         last.setTotalMessagesSent(newModifications.size());
284         return newModifications;
285     }
286
287     private void handleCanCommit(CohortEntry cohortEntry) {
288         String transactionID = cohortEntry.getTransactionID();
289
290         cohortEntry.updateLastAccessTime();
291
292         if(currentCohortEntry != null) {
293             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
294             // queue and will get processed after all prior entries complete.
295
296             if(log.isDebugEnabled()) {
297                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
298                         name, currentCohortEntry.getTransactionID(), transactionID);
299             }
300
301             return;
302         }
303
304         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
305         // it the current entry and proceed with canCommit.
306         // Purposely checking reference equality here.
307         if(queuedCohortEntries.peek() == cohortEntry) {
308             currentCohortEntry = queuedCohortEntries.poll();
309             doCanCommit(currentCohortEntry);
310         } else {
311             if(log.isDebugEnabled()) {
312                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
313                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
314                                 transactionID);
315             }
316         }
317     }
318
319     /**
320      * This method handles the canCommit phase for a transaction.
321      *
322      * @param transactionID the ID of the transaction to canCommit
323      * @param sender the actor to which to send the response
324      * @param shard the transaction's shard actor
325      */
326     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
327         // Lookup the cohort entry that was cached previously (or should have been) by
328         // transactionReady (via the ForwardedReadyTransaction message).
329         final CohortEntry cohortEntry = cohortCache.get(transactionID);
330         if(cohortEntry == null) {
331             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
332             // between canCommit and ready and the entry was expired from the cache.
333             IllegalStateException ex = new IllegalStateException(
334                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
335             log.error(ex.getMessage());
336             sender.tell(new Failure(ex), shard.self());
337             return;
338         }
339
340         cohortEntry.setReplySender(sender);
341         cohortEntry.setShard(shard);
342
343         handleCanCommit(cohortEntry);
344     }
345
346     private void doCanCommit(final CohortEntry cohortEntry) {
347         boolean canCommit = false;
348         try {
349             canCommit = cohortEntry.canCommit();
350
351             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
352
353             if(cohortEntry.isDoImmediateCommit()) {
354                 if(canCommit) {
355                     doCommit(cohortEntry);
356                 } else {
357                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
358                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
359                 }
360             } else {
361                 cohortEntry.getReplySender().tell(
362                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
363                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
364                         cohortEntry.getShard().self());
365             }
366         } catch (Exception e) {
367             log.debug("{}: An exception occurred during canCommit", name, e);
368
369             Throwable failure = e;
370             if(e instanceof ExecutionException) {
371                 failure = e.getCause();
372             }
373
374             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
375         } finally {
376             if(!canCommit) {
377                 // Remove the entry from the cache now.
378                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
379             }
380         }
381     }
382
383     private boolean doCommit(CohortEntry cohortEntry) {
384         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
385
386         boolean success = false;
387
388         // We perform the preCommit phase here atomically with the commit phase. This is an
389         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
390         // coordination of preCommit across shards in case of failure but preCommit should not
391         // normally fail since we ensure only one concurrent 3-phase commit.
392
393         try {
394             cohortEntry.preCommit();
395
396             cohortEntry.getShard().continueCommit(cohortEntry);
397
398             cohortEntry.updateLastAccessTime();
399
400             success = true;
401         } catch (Exception e) {
402             log.error("{} An exception occurred while preCommitting transaction {}",
403                     name, cohortEntry.getTransactionID(), e);
404             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
405
406             currentTransactionComplete(cohortEntry.getTransactionID(), true);
407         }
408
409         return success;
410     }
411
412     /**
413      * This method handles the preCommit and commit phases for a transaction.
414      *
415      * @param transactionID the ID of the transaction to commit
416      * @param sender the actor to which to send the response
417      * @param shard the transaction's shard actor
418      * @return true if the transaction was successfully prepared, false otherwise.
419      */
420     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
421         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
422         // this transaction.
423         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
424         if(cohortEntry == null) {
425             // We're not the current Tx - the Tx was likely expired b/c it took too long in
426             // between the canCommit and commit messages.
427             IllegalStateException ex = new IllegalStateException(
428                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
429                             name, transactionID));
430             log.error(ex.getMessage());
431             sender.tell(new Failure(ex), shard.self());
432             return false;
433         }
434
435         cohortEntry.setReplySender(sender);
436         return doCommit(cohortEntry);
437     }
438
439     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
440         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
441         if(cohortEntry != null) {
442             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
443             // aborted during replication in which case we may still commit locally if replication
444             // succeeds.
445             currentTransactionComplete(transactionID, false);
446         } else {
447             cohortEntry = getAndRemoveCohortEntry(transactionID);
448         }
449
450         if(cohortEntry == null) {
451             return;
452         }
453
454         log.debug("{}: Aborting transaction {}", name, transactionID);
455
456         final ActorRef self = shard.getSelf();
457         try {
458             cohortEntry.abort();
459
460             shard.getShardMBean().incrementAbortTransactionsCount();
461
462             if(sender != null) {
463                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
464             }
465         } catch (Exception e) {
466             log.error("{}: An exception happened during abort", name, e);
467
468             if(sender != null) {
469                 sender.tell(new Failure(e), self);
470             }
471         }
472     }
473
474     void checkForExpiredTransactions(final long timeout, final Shard shard) {
475         CohortEntry cohortEntry = getCurrentCohortEntry();
476         if(cohortEntry != null) {
477             if(cohortEntry.isExpired(timeout)) {
478                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
479                         name, cohortEntry.getTransactionID(), timeout);
480
481                 handleAbort(cohortEntry.getTransactionID(), null, shard);
482             }
483         }
484
485         cleanupExpiredCohortEntries();
486     }
487
488     void abortPendingTransactions(final String reason, final Shard shard) {
489         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
490             return;
491         }
492
493         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
494
495         log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
496
497         for(CohortEntry cohortEntry: cohortEntries) {
498             if(cohortEntry.getReplySender() != null) {
499                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
500             }
501         }
502     }
503
504     private List<CohortEntry> getAndClearPendingCohortEntries() {
505         List<CohortEntry> cohortEntries = new ArrayList<>();
506         if(currentCohortEntry != null) {
507             cohortEntries.add(currentCohortEntry);
508             cohortCache.remove(currentCohortEntry.getTransactionID());
509             currentCohortEntry = null;
510         }
511
512         for(CohortEntry cohortEntry: queuedCohortEntries) {
513             cohortEntries.add(cohortEntry);
514             cohortCache.remove(cohortEntry.getTransactionID());
515         }
516
517         queuedCohortEntries.clear();
518         return cohortEntries;
519     }
520
521     Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
522         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
523             return Collections.emptyList();
524         }
525
526         Collection<Object> messages = new ArrayList<>();
527         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
528         for(CohortEntry cohortEntry: cohortEntries) {
529             if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
530                 continue;
531             }
532
533             final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
534             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
535                 @Override
536                 protected BatchedModifications getModifications() {
537                     if(newModifications.isEmpty() ||
538                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
539                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
540                                 cohortEntry.getClientVersion(), ""));
541                     }
542
543                     return newModifications.getLast();
544                 }
545             });
546
547             if(!newModifications.isEmpty()) {
548                 BatchedModifications last = newModifications.getLast();
549                 last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
550                 last.setReady(true);
551                 last.setTotalMessagesSent(newModifications.size());
552                 messages.addAll(newModifications);
553
554                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
555                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
556                             cohortEntry.getClientVersion()));
557                 }
558
559                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
560                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
561                             cohortEntry.getClientVersion()));
562                 }
563             }
564         }
565
566         return messages;
567     }
568
569     /**
570      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
571      * matches the current entry.
572      *
573      * @param transactionID the ID of the transaction
574      * @return the current CohortEntry or null if the given transaction ID does not match the
575      *         current entry.
576      */
577     CohortEntry getCohortEntryIfCurrent(String transactionID) {
578         if(isCurrentTransaction(transactionID)) {
579             return currentCohortEntry;
580         }
581
582         return null;
583     }
584
585     CohortEntry getCurrentCohortEntry() {
586         return currentCohortEntry;
587     }
588
589     CohortEntry getAndRemoveCohortEntry(String transactionID) {
590         return cohortCache.remove(transactionID);
591     }
592
593     boolean isCurrentTransaction(String transactionID) {
594         return currentCohortEntry != null &&
595                 currentCohortEntry.getTransactionID().equals(transactionID);
596     }
597
598     /**
599      * This method is called when a transaction is complete, successful or not. If the given
600      * given transaction ID matches the current in-progress transaction, the next cohort entry,
601      * if any, is dequeued and processed.
602      *
603      * @param transactionID the ID of the completed transaction
604      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
605      *        the cache.
606      */
607     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
608         if(removeCohortEntry) {
609             cohortCache.remove(transactionID);
610         }
611
612         if(isCurrentTransaction(transactionID)) {
613             currentCohortEntry = null;
614
615             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
616
617             maybeProcessNextCohortEntry();
618         }
619     }
620
621     private void maybeProcessNextCohortEntry() {
622         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
623         // clean out expired entries.
624         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
625         while(iter.hasNext()) {
626             CohortEntry next = iter.next();
627             if(next.isReadyToCommit()) {
628                 if(currentCohortEntry == null) {
629                     if(log.isDebugEnabled()) {
630                         log.debug("{}: Next entry to canCommit {}", name, next);
631                     }
632
633                     iter.remove();
634                     currentCohortEntry = next;
635                     currentCohortEntry.updateLastAccessTime();
636                     doCanCommit(currentCohortEntry);
637                 }
638
639                 break;
640             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
641                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
642                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
643             } else if(!next.isAborted()) {
644                 break;
645             }
646
647             iter.remove();
648             cohortCache.remove(next.getTransactionID());
649         }
650
651         maybeRunOperationOnPendingTransactionsComplete();
652     }
653
654     void cleanupExpiredCohortEntries() {
655         maybeProcessNextCohortEntry();
656     }
657
658     void setRunOnPendingTransactionsComplete(Runnable operation) {
659         runOnPendingTransactionsComplete = operation;
660         maybeRunOperationOnPendingTransactionsComplete();
661     }
662
663     private void maybeRunOperationOnPendingTransactionsComplete() {
664         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
665             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
666
667             runOnPendingTransactionsComplete.run();
668             runOnPendingTransactionsComplete = null;
669         }
670     }
671
672     @VisibleForTesting
673     void setCohortDecorator(CohortDecorator cohortDecorator) {
674         this.cohortDecorator = cohortDecorator;
675     }
676
677     static class CohortEntry {
678         enum State {
679             PENDING,
680             CAN_COMMITTED,
681             PRE_COMMITTED,
682             COMMITTED,
683             ABORTED
684         }
685
686         private final String transactionID;
687         private ShardDataTreeCohort cohort;
688         private final ReadWriteShardDataTreeTransaction transaction;
689         private RuntimeException lastBatchedModificationsException;
690         private ActorRef replySender;
691         private Shard shard;
692         private boolean doImmediateCommit;
693         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
694         private int totalBatchedModificationsReceived;
695         private State state = State.PENDING;
696         private final short clientVersion;
697
698         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
699             this.transaction = Preconditions.checkNotNull(transaction);
700             this.transactionID = transactionID;
701             this.clientVersion = clientVersion;
702         }
703
704         CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
705             this.transactionID = transactionID;
706             this.cohort = cohort;
707             this.transaction = null;
708             this.clientVersion = clientVersion;
709         }
710
711         void updateLastAccessTime() {
712             lastAccessTimer.reset();
713             lastAccessTimer.start();
714         }
715
716         String getTransactionID() {
717             return transactionID;
718         }
719
720         short getClientVersion() {
721             return clientVersion;
722         }
723
724         State getState() {
725             return state;
726         }
727
728         DataTreeCandidate getCandidate() {
729             return cohort.getCandidate();
730         }
731
732         DataTreeModification getDataTreeModification() {
733             return cohort.getDataTreeModification();
734         }
735
736         ReadWriteShardDataTreeTransaction getTransaction() {
737             return transaction;
738         }
739
740         int getTotalBatchedModificationsReceived() {
741             return totalBatchedModificationsReceived;
742         }
743
744         RuntimeException getLastBatchedModificationsException() {
745             return lastBatchedModificationsException;
746         }
747
748         void applyModifications(Iterable<Modification> modifications) {
749             totalBatchedModificationsReceived++;
750             if(lastBatchedModificationsException == null) {
751                 for (Modification modification : modifications) {
752                         try {
753                             modification.apply(transaction.getSnapshot());
754                         } catch (RuntimeException e) {
755                             lastBatchedModificationsException = e;
756                             throw e;
757                         }
758                 }
759             }
760         }
761
762         boolean canCommit() throws InterruptedException, ExecutionException {
763             state = State.CAN_COMMITTED;
764
765             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
766             // about possibly accessing our state on a different thread outside of our dispatcher.
767             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
768             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
769             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
770             return cohort.canCommit().get();
771         }
772
773         void preCommit() throws InterruptedException, ExecutionException {
774             state = State.PRE_COMMITTED;
775             cohort.preCommit().get();
776         }
777
778         void commit() throws InterruptedException, ExecutionException {
779             state = State.COMMITTED;
780             cohort.commit().get();
781         }
782
783         void abort() throws InterruptedException, ExecutionException {
784             state = State.ABORTED;
785             cohort.abort().get();
786         }
787
788         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
789             Preconditions.checkState(cohort == null, "cohort was already set");
790
791             setDoImmediateCommit(doImmediateCommit);
792
793             cohort = transaction.ready();
794
795             if(cohortDecorator != null) {
796                 // Call the hook for unit tests.
797                 cohort = cohortDecorator.decorate(transactionID, cohort);
798             }
799         }
800
801         boolean isReadyToCommit() {
802             return replySender != null;
803         }
804
805         boolean isExpired(long expireTimeInMillis) {
806             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
807         }
808
809         boolean isDoImmediateCommit() {
810             return doImmediateCommit;
811         }
812
813         void setDoImmediateCommit(boolean doImmediateCommit) {
814             this.doImmediateCommit = doImmediateCommit;
815         }
816
817         ActorRef getReplySender() {
818             return replySender;
819         }
820
821         void setReplySender(ActorRef replySender) {
822             this.replySender = replySender;
823         }
824
825         Shard getShard() {
826             return shard;
827         }
828
829         void setShard(Shard shard) {
830             this.shard = shard;
831         }
832
833
834         boolean isAborted() {
835             return state == State.ABORTED;
836         }
837
838         @Override
839         public String toString() {
840             StringBuilder builder = new StringBuilder();
841             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
842                     .append(doImmediateCommit).append("]");
843             return builder.toString();
844         }
845     }
846 }