BUG-5626: split out CohortEntry
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.concurrent.ExecutionException;
25 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39
40 /**
41  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
42  *
43  * @author Thomas Pantelis
44  */
45 final class ShardCommitCoordinator {
46
47     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
48     public interface CohortDecorator {
49         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
50     }
51
52     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
53
54     private CohortEntry currentCohortEntry;
55
56     private final ShardDataTree dataTree;
57
58     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
59
60     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
61     // since this should only be accessed on the shard's dispatcher.
62     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
63
64     private int queueCapacity;
65
66     private final Logger log;
67
68     private final String name;
69
70     private final long cacheExpiryTimeoutInMillis;
71
72     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
73     private CohortDecorator cohortDecorator;
74
75     private ReadyTransactionReply readyTransactionReply;
76
77     private Runnable runOnPendingTransactionsComplete;
78
79     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
80             String name) {
81
82         this.queueCapacity = queueCapacity;
83         this.log = log;
84         this.name = name;
85         this.dataTree = Preconditions.checkNotNull(dataTree);
86         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
87     }
88
89     int getQueueSize() {
90         return queuedCohortEntries.size();
91     }
92
93     int getCohortCacheSize() {
94         return cohortCache.size();
95     }
96
97     void setQueueCapacity(int queueCapacity) {
98         this.queueCapacity = queueCapacity;
99     }
100
101     private ReadyTransactionReply readyTransactionReply(Shard shard) {
102         if(readyTransactionReply == null) {
103             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
104         }
105
106         return readyTransactionReply;
107     }
108
109     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
110         if(queuedCohortEntries.size() < queueCapacity) {
111             queuedCohortEntries.offer(cohortEntry);
112
113             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
114                     queuedCohortEntries.size());
115
116             return true;
117         } else {
118             cohortCache.remove(cohortEntry.getTransactionID());
119
120             final RuntimeException ex = new RuntimeException(
121                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
122                                   " capacity %d has been reached.",
123                                   name, cohortEntry.getTransactionID(), queueCapacity));
124             log.error(ex.getMessage());
125             sender.tell(new Failure(ex), shard.self());
126             return false;
127         }
128     }
129
130     /**
131      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
132      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
133      *
134      * @param ready the ForwardedReadyTransaction message to process
135      * @param sender the sender of the message
136      * @param shard the transaction's shard actor
137      * @param schema
138      */
139     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
140             SchemaContext schema) {
141         log.debug("{}: Readying transaction {}, client version {}", name,
142                 ready.getTransactionID(), ready.getTxnClientVersion());
143
144         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
145         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
146         cohortCache.put(ready.getTransactionID(), cohortEntry);
147
148         if(!queueCohortEntry(cohortEntry, sender, shard)) {
149             return;
150         }
151
152         if(ready.isDoImmediateCommit()) {
153             cohortEntry.setDoImmediateCommit(true);
154             cohortEntry.setReplySender(sender);
155             cohortEntry.setShard(shard);
156             handleCanCommit(cohortEntry);
157         } else {
158             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
159             // front-end so send back a ReadyTransactionReply with our actor path.
160             sender.tell(readyTransactionReply(shard), shard.self());
161         }
162     }
163
164     /**
165      * This method handles a BatchedModifications message for a transaction being prepared directly on the
166      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
167      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
168      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
169      *
170      * @param batched the BatchedModifications message to process
171      * @param sender the sender of the message
172      * @param shard the transaction's shard actor
173      */
174     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
175         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
176         if(cohortEntry == null) {
177             cohortEntry = new CohortEntry(batched.getTransactionID(),
178                     dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
179                     cohortRegistry, schema,  batched.getVersion());
180             cohortCache.put(batched.getTransactionID(), cohortEntry);
181         }
182
183         if(log.isDebugEnabled()) {
184             log.debug("{}: Applying {} batched modifications for Tx {}", name,
185                     batched.getModifications().size(), batched.getTransactionID());
186         }
187
188         cohortEntry.applyModifications(batched.getModifications());
189
190         if(batched.isReady()) {
191             if(cohortEntry.getLastBatchedModificationsException() != null) {
192                 cohortCache.remove(cohortEntry.getTransactionID());
193                 throw cohortEntry.getLastBatchedModificationsException();
194             }
195
196             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
197                 cohortCache.remove(cohortEntry.getTransactionID());
198                 throw new IllegalStateException(String.format(
199                         "The total number of batched messages received %d does not match the number sent %d",
200                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
201             }
202
203             if(!queueCohortEntry(cohortEntry, sender, shard)) {
204                 return;
205             }
206
207             if(log.isDebugEnabled()) {
208                 log.debug("{}: Readying Tx {}, client version {}", name,
209                         batched.getTransactionID(), batched.getVersion());
210             }
211
212             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
213
214             if(batched.isDoCommitOnReady()) {
215                 cohortEntry.setReplySender(sender);
216                 cohortEntry.setShard(shard);
217                 handleCanCommit(cohortEntry);
218             } else {
219                 sender.tell(readyTransactionReply(shard), shard.self());
220             }
221         } else {
222             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
223         }
224     }
225
226     /**
227      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
228      * been prepared beforehand by the sender and we just need to drive them through into the
229      * dataTree.
230      *
231      * @param message the ReadyLocalTransaction message to process
232      * @param sender the sender of the message
233      * @param shard the transaction's shard actor
234      */
235     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
236             SchemaContext schema) {
237         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
238                 message.getTransactionID());
239         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
240                 DataStoreVersions.CURRENT_VERSION);
241         cohortCache.put(message.getTransactionID(), cohortEntry);
242         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
243
244         if(!queueCohortEntry(cohortEntry, sender, shard)) {
245             return;
246         }
247
248         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
249
250         if (message.isDoCommitOnReady()) {
251             cohortEntry.setReplySender(sender);
252             cohortEntry.setShard(shard);
253             handleCanCommit(cohortEntry);
254         } else {
255             sender.tell(readyTransactionReply(shard), shard.self());
256         }
257     }
258
259     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
260             final int maxModificationsPerBatch) {
261         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
262         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
263             return Collections.singletonList(from);
264         }
265
266         cohortEntry.applyModifications(from.getModifications());
267
268         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
269         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
270             @Override
271             protected BatchedModifications getModifications() {
272                 if(newModifications.isEmpty() ||
273                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
274                     newModifications.add(new BatchedModifications(from.getTransactionID(),
275                             from.getVersion(), from.getTransactionChainID()));
276                 }
277
278                 return newModifications.getLast();
279             }
280         });
281
282         BatchedModifications last = newModifications.getLast();
283         last.setDoCommitOnReady(from.isDoCommitOnReady());
284         last.setReady(from.isReady());
285         last.setTotalMessagesSent(newModifications.size());
286         return newModifications;
287     }
288
289     private void handleCanCommit(CohortEntry cohortEntry) {
290         String transactionID = cohortEntry.getTransactionID();
291
292         cohortEntry.updateLastAccessTime();
293
294         if(currentCohortEntry != null) {
295             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
296             // queue and will get processed after all prior entries complete.
297
298             if(log.isDebugEnabled()) {
299                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
300                         name, currentCohortEntry.getTransactionID(), transactionID);
301             }
302
303             return;
304         }
305
306         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
307         // it the current entry and proceed with canCommit.
308         // Purposely checking reference equality here.
309         if(queuedCohortEntries.peek() == cohortEntry) {
310             currentCohortEntry = queuedCohortEntries.poll();
311             doCanCommit(currentCohortEntry);
312         } else {
313             if(log.isDebugEnabled()) {
314                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
315                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
316                                 transactionID);
317             }
318         }
319     }
320
321     /**
322      * This method handles the canCommit phase for a transaction.
323      *
324      * @param transactionID the ID of the transaction to canCommit
325      * @param sender the actor to which to send the response
326      * @param shard the transaction's shard actor
327      */
328     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
329         // Lookup the cohort entry that was cached previously (or should have been) by
330         // transactionReady (via the ForwardedReadyTransaction message).
331         final CohortEntry cohortEntry = cohortCache.get(transactionID);
332         if(cohortEntry == null) {
333             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
334             // between canCommit and ready and the entry was expired from the cache.
335             IllegalStateException ex = new IllegalStateException(
336                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
337             log.error(ex.getMessage());
338             sender.tell(new Failure(ex), shard.self());
339             return;
340         }
341
342         cohortEntry.setReplySender(sender);
343         cohortEntry.setShard(shard);
344
345         handleCanCommit(cohortEntry);
346     }
347
348     private void doCanCommit(final CohortEntry cohortEntry) {
349         boolean canCommit = false;
350         try {
351             canCommit = cohortEntry.canCommit();
352
353             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
354
355             if(cohortEntry.isDoImmediateCommit()) {
356                 if(canCommit) {
357                     doCommit(cohortEntry);
358                 } else {
359                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
360                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
361                 }
362             } else {
363                 cohortEntry.getReplySender().tell(
364                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
365                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
366                         cohortEntry.getShard().self());
367             }
368         } catch (Exception e) {
369             log.debug("{}: An exception occurred during canCommit", name, e);
370
371             Throwable failure = e;
372             if(e instanceof ExecutionException) {
373                 failure = e.getCause();
374             }
375
376             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
377         } finally {
378             if(!canCommit) {
379                 // Remove the entry from the cache now.
380                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
381             }
382         }
383     }
384
385     private boolean doCommit(CohortEntry cohortEntry) {
386         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
387
388         boolean success = false;
389
390         // We perform the preCommit phase here atomically with the commit phase. This is an
391         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
392         // coordination of preCommit across shards in case of failure but preCommit should not
393         // normally fail since we ensure only one concurrent 3-phase commit.
394
395         try {
396             cohortEntry.preCommit();
397
398             cohortEntry.getShard().continueCommit(cohortEntry);
399
400             cohortEntry.updateLastAccessTime();
401
402             success = true;
403         } catch (Exception e) {
404             log.error("{} An exception occurred while preCommitting transaction {}",
405                     name, cohortEntry.getTransactionID(), e);
406             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
407
408             currentTransactionComplete(cohortEntry.getTransactionID(), true);
409         }
410
411         return success;
412     }
413
414     /**
415      * This method handles the preCommit and commit phases for a transaction.
416      *
417      * @param transactionID the ID of the transaction to commit
418      * @param sender the actor to which to send the response
419      * @param shard the transaction's shard actor
420      * @return true if the transaction was successfully prepared, false otherwise.
421      */
422     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
423         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
424         // this transaction.
425         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
426         if(cohortEntry == null) {
427             // We're not the current Tx - the Tx was likely expired b/c it took too long in
428             // between the canCommit and commit messages.
429             IllegalStateException ex = new IllegalStateException(
430                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
431                             name, transactionID));
432             log.error(ex.getMessage());
433             sender.tell(new Failure(ex), shard.self());
434             return false;
435         }
436
437         cohortEntry.setReplySender(sender);
438         return doCommit(cohortEntry);
439     }
440
441     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
442         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
443         if(cohortEntry != null) {
444             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
445             // aborted during replication in which case we may still commit locally if replication
446             // succeeds.
447             currentTransactionComplete(transactionID, false);
448         } else {
449             cohortEntry = getAndRemoveCohortEntry(transactionID);
450         }
451
452         if(cohortEntry == null) {
453             return;
454         }
455
456         log.debug("{}: Aborting transaction {}", name, transactionID);
457
458         final ActorRef self = shard.getSelf();
459         try {
460             cohortEntry.abort();
461
462             shard.getShardMBean().incrementAbortTransactionsCount();
463
464             if(sender != null) {
465                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
466             }
467         } catch (Exception e) {
468             log.error("{}: An exception happened during abort", name, e);
469
470             if(sender != null) {
471                 sender.tell(new Failure(e), self);
472             }
473         }
474     }
475
476     void checkForExpiredTransactions(final long timeout, final Shard shard) {
477         CohortEntry cohortEntry = getCurrentCohortEntry();
478         if(cohortEntry != null) {
479             if(cohortEntry.isExpired(timeout)) {
480                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
481                         name, cohortEntry.getTransactionID(), timeout);
482
483                 handleAbort(cohortEntry.getTransactionID(), null, shard);
484             }
485         }
486
487         cleanupExpiredCohortEntries();
488     }
489
490     void abortPendingTransactions(final String reason, final Shard shard) {
491         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
492             return;
493         }
494
495         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
496
497         log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
498
499         for(CohortEntry cohortEntry: cohortEntries) {
500             if(cohortEntry.getReplySender() != null) {
501                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
502             }
503         }
504     }
505
506     private List<CohortEntry> getAndClearPendingCohortEntries() {
507         List<CohortEntry> cohortEntries = new ArrayList<>();
508
509         if(currentCohortEntry != null) {
510             cohortEntries.add(currentCohortEntry);
511             cohortCache.remove(currentCohortEntry.getTransactionID());
512             currentCohortEntry = null;
513         }
514
515         for(CohortEntry cohortEntry: queuedCohortEntries) {
516             cohortEntries.add(cohortEntry);
517             cohortCache.remove(cohortEntry.getTransactionID());
518         }
519
520         queuedCohortEntries.clear();
521         return cohortEntries;
522     }
523
524     Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
525         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
526             return Collections.emptyList();
527         }
528
529         Collection<Object> messages = new ArrayList<>();
530         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
531         for(CohortEntry cohortEntry: cohortEntries) {
532             if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
533                 continue;
534             }
535
536             final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
537             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
538                 @Override
539                 protected BatchedModifications getModifications() {
540                     if(newModifications.isEmpty() ||
541                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
542                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
543                                 cohortEntry.getClientVersion(), ""));
544         }
545
546                     return newModifications.getLast();
547                 }
548             });
549
550             if(!newModifications.isEmpty()) {
551                 BatchedModifications last = newModifications.getLast();
552                 last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
553                 last.setReady(true);
554                 last.setTotalMessagesSent(newModifications.size());
555                 messages.addAll(newModifications);
556
557                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
558                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
559                             cohortEntry.getClientVersion()));
560                 }
561
562                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
563                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
564                             cohortEntry.getClientVersion()));
565                 }
566             }
567         }
568
569         return messages;
570     }
571
572     /**
573      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
574      * matches the current entry.
575      *
576      * @param transactionID the ID of the transaction
577      * @return the current CohortEntry or null if the given transaction ID does not match the
578      *         current entry.
579      */
580     CohortEntry getCohortEntryIfCurrent(String transactionID) {
581         if(isCurrentTransaction(transactionID)) {
582             return currentCohortEntry;
583         }
584
585         return null;
586     }
587
588     CohortEntry getCurrentCohortEntry() {
589         return currentCohortEntry;
590     }
591
592     CohortEntry getAndRemoveCohortEntry(String transactionID) {
593         return cohortCache.remove(transactionID);
594     }
595
596     boolean isCurrentTransaction(String transactionID) {
597         return currentCohortEntry != null &&
598                 currentCohortEntry.getTransactionID().equals(transactionID);
599     }
600
601     /**
602      * This method is called when a transaction is complete, successful or not. If the given
603      * given transaction ID matches the current in-progress transaction, the next cohort entry,
604      * if any, is dequeued and processed.
605      *
606      * @param transactionID the ID of the completed transaction
607      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
608      *        the cache.
609      */
610     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
611         if(removeCohortEntry) {
612             cohortCache.remove(transactionID);
613         }
614
615         if(isCurrentTransaction(transactionID)) {
616             currentCohortEntry = null;
617
618             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
619
620             maybeProcessNextCohortEntry();
621         }
622     }
623
624     private void maybeProcessNextCohortEntry() {
625         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
626         // clean out expired entries.
627         final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
628         while(iter.hasNext()) {
629             final CohortEntry next = iter.next();
630             if(next.isReadyToCommit()) {
631                 if(currentCohortEntry == null) {
632                     if(log.isDebugEnabled()) {
633                         log.debug("{}: Next entry to canCommit {}", name, next);
634                     }
635
636                     iter.remove();
637                     currentCohortEntry = next;
638                     currentCohortEntry.updateLastAccessTime();
639                     doCanCommit(currentCohortEntry);
640                 }
641
642                 break;
643             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
644                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
645                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
646             } else if(!next.isAborted()) {
647                 break;
648             }
649
650             iter.remove();
651             cohortCache.remove(next.getTransactionID());
652         }
653
654         maybeRunOperationOnPendingTransactionsComplete();
655     }
656
657     void cleanupExpiredCohortEntries() {
658         maybeProcessNextCohortEntry();
659     }
660
661     void setRunOnPendingTransactionsComplete(Runnable operation) {
662         runOnPendingTransactionsComplete = operation;
663         maybeRunOperationOnPendingTransactionsComplete();
664     }
665
666     private void maybeRunOperationOnPendingTransactionsComplete() {
667         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
668             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
669
670             runOnPendingTransactionsComplete.run();
671             runOnPendingTransactionsComplete = null;
672         }
673     }
674
675     @VisibleForTesting
676     void setCohortDecorator(CohortDecorator cohortDecorator) {
677         this.cohortDecorator = cohortDecorator;
678     }
679
680    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
681         cohortRegistry.process(sender, message);
682     }
683 }