2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Verify;
16 import com.google.common.primitives.UnsignedLong;
17 import java.io.IOException;
18 import java.util.AbstractMap.SimpleEntry;
19 import java.util.ArrayDeque;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.Iterator;
25 import java.util.Map.Entry;
26 import java.util.Queue;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import javax.annotation.concurrent.NotThreadSafe;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
35 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
36 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
37 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
38 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
39 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
40 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
41 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
42 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
45 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
46 import org.opendaylight.yangtools.concepts.Identifier;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
53 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
57 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
60 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.Duration;
67 * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
68 * e.g. it does not expose public interfaces and assumes it is only ever called from a
71 * This class is not part of the API contract and is subject to change at any time.
74 public class ShardDataTree extends ShardDataTreeTransactionParent {
75 private static final class CommitEntry {
76 final SimpleShardDataTreeCohort cohort;
79 CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
80 this.cohort = Preconditions.checkNotNull(cohort);
85 private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
86 private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
88 private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
89 private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
90 private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
91 private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
92 private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
93 private final TipProducingDataTree dataTree;
94 private final String logContext;
95 private final Shard shard;
96 private Runnable runOnPendingTransactionsComplete;
98 private SchemaContext schemaContext;
100 public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
101 final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
102 final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
103 this.dataTree = dataTree;
104 updateSchemaContext(schemaContext);
106 this.shard = Preconditions.checkNotNull(shard);
107 this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
108 this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
109 this.logContext = Preconditions.checkNotNull(logContext);
112 public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
113 final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
114 final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
115 this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
116 treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
120 public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
121 this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
122 new DefaultShardDataChangeListenerPublisher(), "");
125 String logContext() {
129 public TipProducingDataTree getDataTree() {
133 SchemaContext getSchemaContext() {
134 return schemaContext;
137 void updateSchemaContext(final SchemaContext schemaContext) {
138 dataTree.setSchemaContext(schemaContext);
139 this.schemaContext = Preconditions.checkNotNull(schemaContext);
142 ShardDataTreeSnapshot takeRecoverySnapshot() {
143 return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
146 void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
147 // FIXME: purge any outstanding transactions
149 final DataTreeModification snapshot = transaction.getSnapshot();
152 dataTree.validate(snapshot);
153 dataTree.commit(dataTree.prepare(snapshot));
156 private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
157 ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
159 chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
160 transactionChains.put(localHistoryIdentifier, chain);
166 ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
167 if (txId.getHistoryId().getHistoryId() == 0) {
168 return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
171 return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
174 ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
175 if (txId.getHistoryId().getHistoryId() == 0) {
176 return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
180 return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
183 public void notifyListeners(final DataTreeCandidate candidate) {
184 treeChangeListenerPublisher.publishChanges(candidate, logContext);
185 dataChangeListenerPublisher.publishChanges(candidate, logContext);
188 void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
189 NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
190 if (currentState.isPresent()) {
191 ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
192 localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
193 listenerReg.getScope());
194 localPublisher.publishChanges(currentState.get(), logContext);
198 void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
199 final Optional<DataTreeCandidate> currentState) {
200 if (currentState.isPresent()) {
201 ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
202 localPublisher.registerTreeChangeListener(path, listener);
203 localPublisher.publishChanges(currentState.get(), logContext);
207 void closeAllTransactionChains() {
208 for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
212 transactionChains.clear();
215 void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
216 final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
220 LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
224 Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
225 Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
226 final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
227 final DataChangeScope scope) {
228 final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
229 dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
231 return new SimpleEntry<>(reg, readCurrentData());
234 private Optional<DataTreeCandidate> readCurrentData() {
235 final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
236 return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
237 YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
240 public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
241 final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
242 final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
245 return new SimpleEntry<>(reg, readCurrentData());
249 return pendingTransactions.size();
252 void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
253 LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
255 final DataTreeModification mod = dataTree.takeSnapshot().newModification();
256 DataTreeCandidates.applyToModification(mod, foreign);
259 LOG.trace("{}: Applying foreign modification {}", logContext, mod);
260 dataTree.validate(mod);
261 final DataTreeCandidate candidate = dataTree.prepare(mod);
262 dataTree.commit(candidate);
263 notifyListeners(candidate);
267 void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
272 ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
273 final DataTreeModification snapshot = transaction.getSnapshot();
276 return createReadyCohort(transaction.getId(), snapshot);
279 public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
280 return dataTree.takeSnapshot().readNode(path);
283 public DataTreeSnapshot takeSnapshot() {
284 return dataTree.takeSnapshot();
287 public DataTreeModification newModification() {
288 return dataTree.takeSnapshot().newModification();
292 // FIXME: This should be removed, it violates encapsulation
293 public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
294 modification.ready();
295 dataTree.validate(modification);
296 DataTreeCandidateTip candidate = dataTree.prepare(modification);
297 dataTree.commit(candidate);
301 public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
302 Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
303 for(CommitEntry entry: pendingTransactions) {
304 ret.add(entry.cohort);
307 pendingTransactions.clear();
311 private void processNextTransaction() {
312 while (!pendingTransactions.isEmpty()) {
313 final CommitEntry entry = pendingTransactions.peek();
314 final SimpleShardDataTreeCohort cohort = entry.cohort;
315 final DataTreeModification modification = cohort.getDataTreeModification();
317 if(cohort.getState() != State.CAN_COMMIT_PENDING) {
321 LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
324 dataTree.validate(modification);
325 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
326 cohort.successfulCanCommit();
327 entry.lastAccess = shard.ticker().read();
329 } catch (ConflictingModificationAppliedException e) {
330 LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
332 cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
333 } catch (DataValidationFailedException e) {
334 LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
337 // For debugging purposes, allow dumping of the modification. Coupled with the above
338 // precondition log, it should allow us to understand what went on.
339 LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
340 cause = new TransactionCommitFailedException("Data did not pass validation.", e);
341 } catch (Exception e) {
342 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
346 // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
347 pendingTransactions.poll().cohort.failedCanCommit(cause);
350 maybeRunOperationOnPendingTransactionsComplete();
353 void startCanCommit(final SimpleShardDataTreeCohort cohort) {
354 final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
355 if (!cohort.equals(current)) {
356 LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
360 processNextTransaction();
363 private void failPreCommit(final Exception cause) {
364 shard.getShardMBean().incrementFailedTransactionsCount();
365 pendingTransactions.poll().cohort.failedPreCommit(cause);
366 processNextTransaction();
369 void startPreCommit(final SimpleShardDataTreeCohort cohort) {
370 final CommitEntry entry = pendingTransactions.peek();
371 Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
373 final SimpleShardDataTreeCohort current = entry.cohort;
374 Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
375 final DataTreeCandidateTip candidate;
377 candidate = dataTree.prepare(cohort.getDataTreeModification());
378 } catch (Exception e) {
384 cohort.userPreCommit(candidate);
385 } catch (ExecutionException | TimeoutException e) {
390 entry.lastAccess = shard.ticker().read();
391 cohort.successfulPreCommit(candidate);
394 private void failCommit(final Exception cause) {
395 shard.getShardMBean().incrementFailedTransactionsCount();
396 pendingTransactions.poll().cohort.failedCommit(cause);
397 processNextTransaction();
400 private void finishCommit(final SimpleShardDataTreeCohort cohort) {
401 final TransactionIdentifier txId = cohort.getIdentifier();
402 final DataTreeCandidate candidate = cohort.getCandidate();
404 LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
408 dataTree.commit(candidate);
409 } catch (IllegalStateException e) {
410 // We may get a "store tree and candidate base differ" IllegalStateException from commit under
411 // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
412 // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
413 // applying it to the state. We then become the leader and a second tx is pre-committed and
414 // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
415 // candidate via applyState prior to the second tx. Since the second tx has already been
416 // pre-committed, when it gets here to commit it will get an IllegalStateException.
418 // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
419 // solution will be forthcoming.
421 LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
422 applyForeignCandidate(txId, candidate);
424 } catch (Exception e) {
429 shard.getShardMBean().incrementCommittedTransactionCount();
430 shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
432 // FIXME: propagate journal index
434 pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
436 LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
437 notifyListeners(candidate);
439 processNextTransaction();
442 void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
443 final CommitEntry entry = pendingTransactions.peek();
444 Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
446 final SimpleShardDataTreeCohort current = entry.cohort;
447 Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
449 if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
450 LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
451 finishCommit(cohort);
455 final TransactionIdentifier txId = cohort.getIdentifier();
456 final Payload payload;
458 payload = CommitTransactionPayload.create(txId, candidate);
459 } catch (IOException e) {
460 LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
461 pendingTransactions.poll().cohort.failedCommit(e);
465 // Once completed, we will continue via payloadReplicationComplete
466 entry.lastAccess = shard.ticker().read();
467 shard.persistPayload(txId, payload);
468 LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
471 private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
472 final CommitEntry current = pendingTransactions.peek();
473 if (current == null) {
474 LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
478 if (!current.cohort.getIdentifier().equals(txId)) {
479 LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
480 current.cohort.getIdentifier(), txId);
484 finishCommit(current.cohort);
487 void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
488 // For now we do not care about anything else but transactions
489 Verify.verify(identifier instanceof TransactionIdentifier);
490 payloadReplicationComplete((TransactionIdentifier)identifier, payload);
493 void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
494 cohortRegistry.process(sender, message);
497 ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
498 final DataTreeModification modification) {
499 SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
500 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
501 pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
505 void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
506 throws DataValidationFailedException, IOException {
507 applyForeignCandidate(identifier, payload.getCandidate().getValue());
510 void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
511 final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
512 final long now = shard.ticker().read();
513 final CommitEntry currentTx = pendingTransactions.peek();
514 if (currentTx != null && currentTx.lastAccess + timeout < now) {
515 LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
516 currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
517 boolean processNext = true;
518 switch (currentTx.cohort.getState()) {
519 case CAN_COMMIT_PENDING:
520 pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
522 case CAN_COMMIT_COMPLETE:
523 pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
525 case PRE_COMMIT_PENDING:
526 pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
528 case PRE_COMMIT_COMPLETE:
529 // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
530 // are ready we should commit the transaction, not abort it. Our current software stack does
531 // not allow us to do that consistently, because we persist at the time of commit, hence
532 // we can end up in a state where we have pre-committed a transaction, then a leader failover
533 // occurred ... the new leader does not see the pre-committed transaction and does not have
534 // a running timer. To fix this we really need two persistence events.
536 // The first one, done at pre-commit time will hold the transaction payload. When consensus
537 // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
538 // apply the state in this event.
540 // The second one, done at commit (or abort) time holds only the transaction identifier and
541 // signals to followers that the state should (or should not) be applied.
543 // In order to make the pre-commit timer working across failovers, though, we need
544 // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
545 // restart the timer.
546 pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
549 LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
550 currentTx.cohort.getIdentifier());
551 currentTx.lastAccess = now;
559 pendingTransactions.poll();
563 processNextTransaction();
568 void startAbort(final SimpleShardDataTreeCohort cohort) {
569 final Iterator<CommitEntry> it = pendingTransactions.iterator();
571 LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
575 // First entry is special, as it may already be committing
576 final CommitEntry first = it.next();
577 if (cohort.equals(first.cohort)) {
578 if (cohort.getState() != State.COMMIT_PENDING) {
579 LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
580 cohort.getIdentifier());
581 pendingTransactions.poll();
582 processNextTransaction();
584 LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
590 while (it.hasNext()) {
591 final CommitEntry e = it.next();
592 if (cohort.equals(e.cohort)) {
593 LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
599 LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
602 void setRunOnPendingTransactionsComplete(final Runnable operation) {
603 runOnPendingTransactionsComplete = operation;
604 maybeRunOperationOnPendingTransactionsComplete();
607 private void maybeRunOperationOnPendingTransactionsComplete() {
608 if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
609 LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
610 runOnPendingTransactionsComplete);
612 runOnPendingTransactionsComplete.run();
613 runOnPendingTransactionsComplete = null;