89fa8fbc2507fc1f25d8de01cabb48a0cfd4359d
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Verify;
16 import com.google.common.primitives.UnsignedLong;
17 import java.io.IOException;
18 import java.util.AbstractMap.SimpleEntry;
19 import java.util.ArrayDeque;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Queue;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import javax.annotation.concurrent.NotThreadSafe;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
35 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
36 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
37 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
38 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
39 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
40 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
41 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
42 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
45 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
46 import org.opendaylight.yangtools.concepts.Identifier;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
53 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
57 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
60 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.Duration;
65
66 /**
67  * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
68  * e.g. it does not expose public interfaces and assumes it is only ever called from a
69  * single thread.
70  *
71  * This class is not part of the API contract and is subject to change at any time.
72  */
73 @NotThreadSafe
74 public class ShardDataTree extends ShardDataTreeTransactionParent {
75     private static final class CommitEntry {
76         final SimpleShardDataTreeCohort cohort;
77         long lastAccess;
78
79         CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
80             this.cohort = Preconditions.checkNotNull(cohort);
81             lastAccess = now;
82         }
83     }
84
85     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
86     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
87
88     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
89     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
90     private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
91     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
92     private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
93     private final TipProducingDataTree dataTree;
94     private final String logContext;
95     private final Shard shard;
96     private Runnable runOnPendingTransactionsComplete;
97
98     private SchemaContext schemaContext;
99
100     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
101             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
102             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
103         this.dataTree = dataTree;
104         updateSchemaContext(schemaContext);
105
106         this.shard = Preconditions.checkNotNull(shard);
107         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
108         this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
109         this.logContext = Preconditions.checkNotNull(logContext);
110     }
111
112     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
113             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
114             final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
115         this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
116                 treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
117     }
118
119     @VisibleForTesting
120     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
121         this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
122                 new DefaultShardDataChangeListenerPublisher(), "");
123     }
124
125     String logContext() {
126         return logContext;
127     }
128
129     public TipProducingDataTree getDataTree() {
130         return dataTree;
131     }
132
133     SchemaContext getSchemaContext() {
134         return schemaContext;
135     }
136
137     void updateSchemaContext(final SchemaContext schemaContext) {
138         dataTree.setSchemaContext(schemaContext);
139         this.schemaContext = Preconditions.checkNotNull(schemaContext);
140     }
141
142     ShardDataTreeSnapshot takeRecoverySnapshot() {
143         return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
144     }
145
146     void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
147         // FIXME: purge any outstanding transactions
148
149         final DataTreeModification snapshot = transaction.getSnapshot();
150         snapshot.ready();
151
152         dataTree.validate(snapshot);
153         dataTree.commit(dataTree.prepare(snapshot));
154     }
155
156     private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
157         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
158         if (chain == null) {
159             chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
160             transactionChains.put(localHistoryIdentifier, chain);
161         }
162
163         return chain;
164     }
165
166     ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
167         if (txId.getHistoryId().getHistoryId() == 0) {
168             return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
169         }
170
171         return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
172     }
173
174     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
175         if (txId.getHistoryId().getHistoryId() == 0) {
176             return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
177                     .newModification());
178         }
179
180         return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
181     }
182
183     public void notifyListeners(final DataTreeCandidate candidate) {
184         treeChangeListenerPublisher.publishChanges(candidate, logContext);
185         dataChangeListenerPublisher.publishChanges(candidate, logContext);
186     }
187
188     void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
189             NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
190         if (currentState.isPresent()) {
191             ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
192             localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
193                     listenerReg.getScope());
194             localPublisher.publishChanges(currentState.get(), logContext);
195         }
196     }
197
198     void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
199             final Optional<DataTreeCandidate> currentState) {
200         if (currentState.isPresent()) {
201             ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
202             localPublisher.registerTreeChangeListener(path, listener);
203             localPublisher.publishChanges(currentState.get(), logContext);
204         }
205     }
206
207     void closeAllTransactionChains() {
208         for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
209             chain.close();
210         }
211
212         transactionChains.clear();
213     }
214
215     void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
216         final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
217         if (chain != null) {
218             chain.close();
219         } else {
220             LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
221         }
222     }
223
224     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
225             Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
226                     final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
227                     final DataChangeScope scope) {
228         final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
229                 dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
230
231         return new SimpleEntry<>(reg, readCurrentData());
232     }
233
234     private Optional<DataTreeCandidate> readCurrentData() {
235         final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
236         return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
237             YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
238     }
239
240     public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
241             final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
242         final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
243                 path, listener);
244
245         return new SimpleEntry<>(reg, readCurrentData());
246     }
247
248     int getQueueSize() {
249         return pendingTransactions.size();
250     }
251
252     void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
253         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
254
255         final DataTreeModification mod = dataTree.takeSnapshot().newModification();
256         DataTreeCandidates.applyToModification(mod, foreign);
257         mod.ready();
258
259         LOG.trace("{}: Applying foreign modification {}", logContext, mod);
260         dataTree.validate(mod);
261         final DataTreeCandidate candidate = dataTree.prepare(mod);
262         dataTree.commit(candidate);
263         notifyListeners(candidate);
264     }
265
266     @Override
267     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
268         // Intentional no-op
269     }
270
271     @Override
272     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
273         final DataTreeModification snapshot = transaction.getSnapshot();
274         snapshot.ready();
275
276         return createReadyCohort(transaction.getId(), snapshot);
277     }
278
279     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
280         return dataTree.takeSnapshot().readNode(path);
281     }
282
283     public DataTreeSnapshot takeSnapshot() {
284         return dataTree.takeSnapshot();
285     }
286
287     public DataTreeModification newModification() {
288         return dataTree.takeSnapshot().newModification();
289     }
290
291     @VisibleForTesting
292     // FIXME: This should be removed, it violates encapsulation
293     public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
294         modification.ready();
295         dataTree.validate(modification);
296         DataTreeCandidateTip candidate = dataTree.prepare(modification);
297         dataTree.commit(candidate);
298         return candidate;
299     }
300
301     public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
302         Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
303         for(CommitEntry entry: pendingTransactions) {
304             ret.add(entry.cohort);
305         }
306
307         pendingTransactions.clear();
308         return ret;
309     }
310
311     private void processNextTransaction() {
312         while (!pendingTransactions.isEmpty()) {
313             final CommitEntry entry = pendingTransactions.peek();
314             final SimpleShardDataTreeCohort cohort = entry.cohort;
315             final DataTreeModification modification = cohort.getDataTreeModification();
316
317             if(cohort.getState() != State.CAN_COMMIT_PENDING) {
318                 break;
319             }
320
321             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
322             Exception cause;
323             try {
324                 dataTree.validate(modification);
325                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
326                 cohort.successfulCanCommit();
327                 entry.lastAccess = shard.ticker().read();
328                 return;
329             } catch (ConflictingModificationAppliedException e) {
330                 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
331                     e.getPath());
332                 cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
333             } catch (DataValidationFailedException e) {
334                 LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
335                     e.getPath(), e);
336
337                 // For debugging purposes, allow dumping of the modification. Coupled with the above
338                 // precondition log, it should allow us to understand what went on.
339                 LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
340                 cause = new TransactionCommitFailedException("Data did not pass validation.", e);
341             } catch (Exception e) {
342                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
343                 cause = e;
344             }
345
346             // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
347             pendingTransactions.poll().cohort.failedCanCommit(cause);
348         }
349
350         maybeRunOperationOnPendingTransactionsComplete();
351     }
352
353     void startCanCommit(final SimpleShardDataTreeCohort cohort) {
354         final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
355         if (!cohort.equals(current)) {
356             LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
357             return;
358         }
359
360         processNextTransaction();
361     }
362
363     private void failPreCommit(final Exception cause) {
364         shard.getShardMBean().incrementFailedTransactionsCount();
365         pendingTransactions.poll().cohort.failedPreCommit(cause);
366         processNextTransaction();
367     }
368
369     void startPreCommit(final SimpleShardDataTreeCohort cohort) {
370         final CommitEntry entry = pendingTransactions.peek();
371         Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
372
373         final SimpleShardDataTreeCohort current = entry.cohort;
374         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
375         final DataTreeCandidateTip candidate;
376         try {
377             candidate = dataTree.prepare(cohort.getDataTreeModification());
378         } catch (Exception e) {
379             failPreCommit(e);
380             return;
381         }
382
383         try {
384             cohort.userPreCommit(candidate);
385         } catch (ExecutionException | TimeoutException e) {
386             failPreCommit(e);
387             return;
388         }
389
390         entry.lastAccess = shard.ticker().read();
391         cohort.successfulPreCommit(candidate);
392     }
393
394     private void failCommit(final Exception cause) {
395         shard.getShardMBean().incrementFailedTransactionsCount();
396         pendingTransactions.poll().cohort.failedCommit(cause);
397         processNextTransaction();
398     }
399
400     private void finishCommit(final SimpleShardDataTreeCohort cohort) {
401         final TransactionIdentifier txId = cohort.getIdentifier();
402         final DataTreeCandidate candidate = cohort.getCandidate();
403
404         LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
405
406         try {
407             try {
408                 dataTree.commit(candidate);
409             } catch (IllegalStateException e) {
410                 // We may get a "store tree and candidate base differ" IllegalStateException from commit under
411                 // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
412                 // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
413                 // applying it to the state. We then become the leader and a second tx is pre-committed and
414                 // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
415                 // candidate via applyState prior to the second tx. Since the second tx has already been
416                 // pre-committed, when it gets here to commit it will get an IllegalStateException.
417
418                 // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
419                 // solution will be forthcoming.
420
421                 LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
422                 applyForeignCandidate(txId, candidate);
423             }
424         } catch (Exception e) {
425             failCommit(e);
426             return;
427         }
428
429         shard.getShardMBean().incrementCommittedTransactionCount();
430         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
431
432         // FIXME: propagate journal index
433
434         pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
435
436         LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
437         notifyListeners(candidate);
438
439         processNextTransaction();
440     }
441
442     void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
443         final CommitEntry entry = pendingTransactions.peek();
444         Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
445
446         final SimpleShardDataTreeCohort current = entry.cohort;
447         Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
448
449         if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
450             LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
451             finishCommit(cohort);
452             return;
453         }
454
455         final TransactionIdentifier txId = cohort.getIdentifier();
456         final Payload payload;
457         try {
458             payload = CommitTransactionPayload.create(txId, candidate);
459         } catch (IOException e) {
460             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
461             pendingTransactions.poll().cohort.failedCommit(e);
462             return;
463         }
464
465         // Once completed, we will continue via payloadReplicationComplete
466         entry.lastAccess = shard.ticker().read();
467         shard.persistPayload(txId, payload);
468         LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
469     }
470
471     private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
472         final CommitEntry current = pendingTransactions.peek();
473         if (current == null) {
474             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
475             return;
476         }
477
478         if (!current.cohort.getIdentifier().equals(txId)) {
479             LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
480                 current.cohort.getIdentifier(), txId);
481             return;
482         }
483
484         finishCommit(current.cohort);
485     }
486
487     void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
488         // For now we do not care about anything else but transactions
489         Verify.verify(identifier instanceof TransactionIdentifier);
490         payloadReplicationComplete((TransactionIdentifier)identifier, payload);
491     }
492
493     void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
494         cohortRegistry.process(sender, message);
495     }
496
497     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
498             final DataTreeModification modification) {
499         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
500                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
501         pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
502         return cohort;
503     }
504
505     void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
506             throws DataValidationFailedException, IOException {
507         applyForeignCandidate(identifier, payload.getCandidate().getValue());
508     }
509
510     void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
511         final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
512         final long now = shard.ticker().read();
513         final CommitEntry currentTx = pendingTransactions.peek();
514         if (currentTx != null && currentTx.lastAccess + timeout < now) {
515             LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
516                     currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
517             boolean processNext = true;
518             switch (currentTx.cohort.getState()) {
519                 case CAN_COMMIT_PENDING:
520                     pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
521                     break;
522                 case CAN_COMMIT_COMPLETE:
523                     pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
524                     break;
525                 case PRE_COMMIT_PENDING:
526                     pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
527                     break;
528                 case PRE_COMMIT_COMPLETE:
529                     // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
530                     //        are ready we should commit the transaction, not abort it. Our current software stack does
531                     //        not allow us to do that consistently, because we persist at the time of commit, hence
532                     //        we can end up in a state where we have pre-committed a transaction, then a leader failover
533                     //        occurred ... the new leader does not see the pre-committed transaction and does not have
534                     //        a running timer. To fix this we really need two persistence events.
535                     //
536                     //        The first one, done at pre-commit time will hold the transaction payload. When consensus
537                     //        is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
538                     //        apply the state in this event.
539                     //
540                     //        The second one, done at commit (or abort) time holds only the transaction identifier and
541                     //        signals to followers that the state should (or should not) be applied.
542                     //
543                     //        In order to make the pre-commit timer working across failovers, though, we need
544                     //        a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
545                     //        restart the timer.
546                     pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
547                     break;
548                 case COMMIT_PENDING:
549                     LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
550                         currentTx.cohort.getIdentifier());
551                     currentTx.lastAccess = now;
552                     processNext = false;
553                     return;
554                 case ABORTED:
555                 case COMMITTED:
556                 case FAILED:
557                 case READY:
558                 default:
559                     pendingTransactions.poll();
560             }
561
562             if (processNext) {
563                 processNextTransaction();
564             }
565         }
566     }
567
568     void startAbort(final SimpleShardDataTreeCohort cohort) {
569         final Iterator<CommitEntry> it = pendingTransactions.iterator();
570         if (!it.hasNext()) {
571             LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
572             return;
573         }
574
575         // First entry is special, as it may already be committing
576         final CommitEntry first = it.next();
577         if (cohort.equals(first.cohort)) {
578             if (cohort.getState() != State.COMMIT_PENDING) {
579                 LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
580                     cohort.getIdentifier());
581                 pendingTransactions.poll();
582                 processNextTransaction();
583             } else {
584                 LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
585             }
586
587             return;
588         }
589
590         while (it.hasNext()) {
591             final CommitEntry e = it.next();
592             if (cohort.equals(e.cohort)) {
593                 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
594                 it.remove();
595                 return;
596             }
597         }
598
599         LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
600     }
601
602     void setRunOnPendingTransactionsComplete(final Runnable operation) {
603         runOnPendingTransactionsComplete = operation;
604         maybeRunOperationOnPendingTransactionsComplete();
605     }
606
607     private void maybeRunOperationOnPendingTransactionsComplete() {
608       if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
609           LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
610                   runOnPendingTransactionsComplete);
611
612           runOnPendingTransactionsComplete.run();
613           runOnPendingTransactionsComplete = null;
614       }
615   }
616 }