2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.Modification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.slf4j.Logger;
37 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
39 * @author Thomas Pantelis
41 class ShardCommitCoordinator {
43 // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
44 public interface CohortDecorator {
45 ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
48 private final Map<String, CohortEntry> cohortCache = new HashMap<>();
50 private CohortEntry currentCohortEntry;
52 private final ShardDataTree dataTree;
54 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
55 // since this should only be accessed on the shard's dispatcher.
56 private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
58 private int queueCapacity;
60 private final Logger log;
62 private final String name;
64 private final long cacheExpiryTimeoutInMillis;
66 // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
67 private CohortDecorator cohortDecorator;
69 private ReadyTransactionReply readyTransactionReply;
71 ShardCommitCoordinator(ShardDataTree dataTree,
72 long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
74 this.queueCapacity = queueCapacity;
77 this.dataTree = Preconditions.checkNotNull(dataTree);
78 this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
82 return queuedCohortEntries.size();
85 void setQueueCapacity(int queueCapacity) {
86 this.queueCapacity = queueCapacity;
89 private ReadyTransactionReply readyTransactionReply(Shard shard) {
90 if(readyTransactionReply == null) {
91 readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
94 return readyTransactionReply;
97 private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
98 if(queuedCohortEntries.size() < queueCapacity) {
99 queuedCohortEntries.offer(cohortEntry);
101 log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
102 queuedCohortEntries.size());
106 cohortCache.remove(cohortEntry.getTransactionID());
108 RuntimeException ex = new RuntimeException(
109 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
110 " capacity %d has been reached.",
111 name, cohortEntry.getTransactionID(), queueCapacity));
112 log.error(ex.getMessage());
113 sender.tell(new Status.Failure(ex), shard.self());
119 * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
120 * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
122 * @param ready the ForwardedReadyTransaction message to process
123 * @param sender the sender of the message
124 * @param shard the transaction's shard actor
126 void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
127 log.debug("{}: Readying transaction {}, client version {}", name,
128 ready.getTransactionID(), ready.getTxnClientVersion());
130 ShardDataTreeCohort cohort = ready.getTransaction().ready();
131 CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
132 cohortCache.put(ready.getTransactionID(), cohortEntry);
134 if(!queueCohortEntry(cohortEntry, sender, shard)) {
138 if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
139 // Return our actor path as we'll handle the three phase commit except if the Tx client
140 // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
141 // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
142 // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
143 ActorRef replyActorPath = shard.self();
144 if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
145 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
146 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
147 ready.getTransactionID()));
150 ReadyTransactionReply readyTransactionReply =
151 new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
152 ready.getTxnClientVersion());
153 sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
154 readyTransactionReply, shard.self());
156 if(ready.isDoImmediateCommit()) {
157 cohortEntry.setDoImmediateCommit(true);
158 cohortEntry.setReplySender(sender);
159 cohortEntry.setShard(shard);
160 handleCanCommit(cohortEntry);
162 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
163 // front-end so send back a ReadyTransactionReply with our actor path.
164 sender.tell(readyTransactionReply(shard), shard.self());
170 * This method handles a BatchedModifications message for a transaction being prepared directly on the
171 * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
172 * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
173 * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
175 * @param batched the BatchedModifications message to process
176 * @param sender the sender of the message
177 * @param shard the transaction's shard actor
179 void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
180 CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
181 if(cohortEntry == null) {
182 cohortEntry = new CohortEntry(batched.getTransactionID(),
183 dataTree.newReadWriteTransaction(batched.getTransactionID(),
184 batched.getTransactionChainID()));
185 cohortCache.put(batched.getTransactionID(), cohortEntry);
188 if(log.isDebugEnabled()) {
189 log.debug("{}: Applying {} batched modifications for Tx {}", name,
190 batched.getModifications().size(), batched.getTransactionID());
193 cohortEntry.applyModifications(batched.getModifications());
195 if(batched.isReady()) {
196 if(cohortEntry.getLastBatchedModificationsException() != null) {
197 cohortCache.remove(cohortEntry.getTransactionID());
198 throw cohortEntry.getLastBatchedModificationsException();
201 if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
202 cohortCache.remove(cohortEntry.getTransactionID());
203 throw new IllegalStateException(String.format(
204 "The total number of batched messages received %d does not match the number sent %d",
205 cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
208 if(!queueCohortEntry(cohortEntry, sender, shard)) {
212 if(log.isDebugEnabled()) {
213 log.debug("{}: Readying Tx {}, client version {}", name,
214 batched.getTransactionID(), batched.getVersion());
217 cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
219 if(batched.isDoCommitOnReady()) {
220 cohortEntry.setReplySender(sender);
221 cohortEntry.setShard(shard);
222 handleCanCommit(cohortEntry);
224 sender.tell(readyTransactionReply(shard), shard.self());
227 sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
232 * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
233 * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
235 * @param message the ReadyLocalTransaction message to process
236 * @param sender the sender of the message
237 * @param shard the transaction's shard actor
239 void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
240 final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
241 message.getTransactionID());
242 final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
243 cohortCache.put(message.getTransactionID(), cohortEntry);
244 cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
246 if(!queueCohortEntry(cohortEntry, sender, shard)) {
250 log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
252 if (message.isDoCommitOnReady()) {
253 cohortEntry.setReplySender(sender);
254 cohortEntry.setShard(shard);
255 handleCanCommit(cohortEntry);
257 sender.tell(readyTransactionReply(shard), shard.self());
261 private void handleCanCommit(CohortEntry cohortEntry) {
262 String transactionID = cohortEntry.getTransactionID();
264 cohortEntry.updateLastAccessTime();
266 if(currentCohortEntry != null) {
267 // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
268 // queue and will get processed after all prior entries complete.
270 if(log.isDebugEnabled()) {
271 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
272 name, currentCohortEntry.getTransactionID(), transactionID);
278 // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
279 // it the current entry and proceed with canCommit.
280 // Purposely checking reference equality here.
281 if(queuedCohortEntries.peek() == cohortEntry) {
282 currentCohortEntry = queuedCohortEntries.poll();
283 doCanCommit(currentCohortEntry);
285 if(log.isDebugEnabled()) {
286 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
287 name, queuedCohortEntries.peek().getTransactionID(), transactionID);
293 * This method handles the canCommit phase for a transaction.
295 * @param transactionID the ID of the transaction to canCommit
296 * @param sender the actor to which to send the response
297 * @param shard the transaction's shard actor
299 void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
300 // Lookup the cohort entry that was cached previously (or should have been) by
301 // transactionReady (via the ForwardedReadyTransaction message).
302 final CohortEntry cohortEntry = cohortCache.get(transactionID);
303 if(cohortEntry == null) {
304 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
305 // between canCommit and ready and the entry was expired from the cache.
306 IllegalStateException ex = new IllegalStateException(
307 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
308 log.error(ex.getMessage());
309 sender.tell(new Status.Failure(ex), shard.self());
313 cohortEntry.setReplySender(sender);
314 cohortEntry.setShard(shard);
316 handleCanCommit(cohortEntry);
319 private void doCanCommit(final CohortEntry cohortEntry) {
320 boolean canCommit = false;
322 canCommit = cohortEntry.canCommit();
324 log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
326 if(cohortEntry.isDoImmediateCommit()) {
328 doCommit(cohortEntry);
330 cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
331 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
334 cohortEntry.getReplySender().tell(
335 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
336 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
338 } catch (Exception e) {
339 log.debug("{}: An exception occurred during canCommit", name, e);
341 Throwable failure = e;
342 if(e instanceof ExecutionException) {
343 failure = e.getCause();
346 cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
349 // Remove the entry from the cache now.
350 currentTransactionComplete(cohortEntry.getTransactionID(), true);
355 private boolean doCommit(CohortEntry cohortEntry) {
356 log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
358 boolean success = false;
360 // We perform the preCommit phase here atomically with the commit phase. This is an
361 // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
362 // coordination of preCommit across shards in case of failure but preCommit should not
363 // normally fail since we ensure only one concurrent 3-phase commit.
366 cohortEntry.preCommit();
368 cohortEntry.getShard().continueCommit(cohortEntry);
370 cohortEntry.updateLastAccessTime();
373 } catch (Exception e) {
374 log.error("{} An exception occurred while preCommitting transaction {}",
375 name, cohortEntry.getTransactionID(), e);
376 cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
378 currentTransactionComplete(cohortEntry.getTransactionID(), true);
385 * This method handles the preCommit and commit phases for a transaction.
387 * @param transactionID the ID of the transaction to commit
388 * @param sender the actor to which to send the response
389 * @param shard the transaction's shard actor
390 * @return true if the transaction was successfully prepared, false otherwise.
392 boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
393 // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
395 final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
396 if(cohortEntry == null) {
397 // We're not the current Tx - the Tx was likely expired b/c it took too long in
398 // between the canCommit and commit messages.
399 IllegalStateException ex = new IllegalStateException(
400 String.format("%s: Cannot commit transaction %s - it is not the current transaction",
401 name, transactionID));
402 log.error(ex.getMessage());
403 sender.tell(new akka.actor.Status.Failure(ex), shard.self());
407 cohortEntry.setReplySender(sender);
408 return doCommit(cohortEntry);
411 void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
412 CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
413 if(cohortEntry != null) {
414 // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
415 // aborted during replication in which case we may still commit locally if replication
417 currentTransactionComplete(transactionID, false);
419 cohortEntry = getAndRemoveCohortEntry(transactionID);
422 if(cohortEntry == null) {
426 log.debug("{}: Aborting transaction {}", name, transactionID);
428 final ActorRef self = shard.getSelf();
432 shard.getShardMBean().incrementAbortTransactionsCount();
435 sender.tell(new AbortTransactionReply().toSerializable(), self);
437 } catch (Exception e) {
438 log.error("{}: An exception happened during abort", name, e);
441 sender.tell(new akka.actor.Status.Failure(e), self);
447 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
448 * matches the current entry.
450 * @param transactionID the ID of the transaction
451 * @return the current CohortEntry or null if the given transaction ID does not match the
454 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
455 if(isCurrentTransaction(transactionID)) {
456 return currentCohortEntry;
462 public CohortEntry getCurrentCohortEntry() {
463 return currentCohortEntry;
466 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
467 return cohortCache.remove(transactionID);
470 public boolean isCurrentTransaction(String transactionID) {
471 return currentCohortEntry != null &&
472 currentCohortEntry.getTransactionID().equals(transactionID);
476 * This method is called when a transaction is complete, successful or not. If the given
477 * given transaction ID matches the current in-progress transaction, the next cohort entry,
478 * if any, is dequeued and processed.
480 * @param transactionID the ID of the completed transaction
481 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
484 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
485 if(removeCohortEntry) {
486 cohortCache.remove(transactionID);
489 if(isCurrentTransaction(transactionID)) {
490 currentCohortEntry = null;
492 log.debug("{}: currentTransactionComplete: {}", name, transactionID);
494 maybeProcessNextCohortEntry();
498 private void maybeProcessNextCohortEntry() {
499 // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
500 // clean out expired entries.
501 Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
502 while(iter.hasNext()) {
503 CohortEntry next = iter.next();
504 if(next.isReadyToCommit()) {
505 if(currentCohortEntry == null) {
506 if(log.isDebugEnabled()) {
507 log.debug("{}: Next entry to canCommit {}", name, next);
511 currentCohortEntry = next;
512 currentCohortEntry.updateLastAccessTime();
513 doCanCommit(currentCohortEntry);
517 } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
518 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
519 name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
520 } else if(!next.isAborted()) {
525 cohortCache.remove(next.getTransactionID());
529 void cleanupExpiredCohortEntries() {
530 maybeProcessNextCohortEntry();
534 void setCohortDecorator(CohortDecorator cohortDecorator) {
535 this.cohortDecorator = cohortDecorator;
538 static class CohortEntry {
539 private final String transactionID;
540 private ShardDataTreeCohort cohort;
541 private final ReadWriteShardDataTreeTransaction transaction;
542 private RuntimeException lastBatchedModificationsException;
543 private ActorRef replySender;
545 private boolean doImmediateCommit;
546 private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
547 private int totalBatchedModificationsReceived;
548 private boolean aborted;
550 CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
551 this.transaction = Preconditions.checkNotNull(transaction);
552 this.transactionID = transactionID;
555 CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
556 this.transactionID = transactionID;
557 this.cohort = cohort;
558 this.transaction = null;
561 void updateLastAccessTime() {
562 lastAccessTimer.reset();
563 lastAccessTimer.start();
566 String getTransactionID() {
567 return transactionID;
570 DataTreeCandidate getCandidate() {
571 return cohort.getCandidate();
574 int getTotalBatchedModificationsReceived() {
575 return totalBatchedModificationsReceived;
578 RuntimeException getLastBatchedModificationsException() {
579 return lastBatchedModificationsException;
582 void applyModifications(Iterable<Modification> modifications) {
583 totalBatchedModificationsReceived++;
584 if(lastBatchedModificationsException == null) {
585 for (Modification modification : modifications) {
587 modification.apply(transaction.getSnapshot());
588 } catch (RuntimeException e) {
589 lastBatchedModificationsException = e;
596 boolean canCommit() throws InterruptedException, ExecutionException {
597 // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
598 // about possibly accessing our state on a different thread outside of our dispatcher.
599 // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
600 // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
601 // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
602 return cohort.canCommit().get();
605 void preCommit() throws InterruptedException, ExecutionException {
606 cohort.preCommit().get();
609 void commit() throws InterruptedException, ExecutionException {
610 cohort.commit().get();
613 void abort() throws InterruptedException, ExecutionException {
615 cohort.abort().get();
618 void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
619 Preconditions.checkState(cohort == null, "cohort was already set");
621 setDoImmediateCommit(doImmediateCommit);
623 cohort = transaction.ready();
625 if(cohortDecorator != null) {
626 // Call the hook for unit tests.
627 cohort = cohortDecorator.decorate(transactionID, cohort);
631 boolean isReadyToCommit() {
632 return replySender != null;
635 boolean isExpired(long expireTimeInMillis) {
636 return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
639 boolean isDoImmediateCommit() {
640 return doImmediateCommit;
643 void setDoImmediateCommit(boolean doImmediateCommit) {
644 this.doImmediateCommit = doImmediateCommit;
647 ActorRef getReplySender() {
651 void setReplySender(ActorRef replySender) {
652 this.replySender = replySender;
659 void setShard(Shard shard) {
664 boolean isAborted() {
669 public String toString() {
670 StringBuilder builder = new StringBuilder();
671 builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
672 .append(doImmediateCommit).append("]");
673 return builder.toString();