2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
36 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
38 * @author Thomas Pantelis
40 class ShardCommitCoordinator {
42 // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43 public interface CohortDecorator {
44 ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
47 private final Map<String, CohortEntry> cohortCache = new HashMap<>();
49 private CohortEntry currentCohortEntry;
51 private final ShardDataTree dataTree;
53 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54 // since this should only be accessed on the shard's dispatcher.
55 private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
57 private int queueCapacity;
59 private final Logger log;
61 private final String name;
63 private final long cacheExpiryTimeoutInMillis;
65 // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66 private CohortDecorator cohortDecorator;
68 private ReadyTransactionReply readyTransactionReply;
70 ShardCommitCoordinator(ShardDataTree dataTree,
71 long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
73 this.queueCapacity = queueCapacity;
76 this.dataTree = Preconditions.checkNotNull(dataTree);
77 this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
81 return queuedCohortEntries.size();
84 void setQueueCapacity(int queueCapacity) {
85 this.queueCapacity = queueCapacity;
88 private ReadyTransactionReply readyTransactionReply(Shard shard) {
89 if(readyTransactionReply == null) {
90 readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
93 return readyTransactionReply;
96 private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
97 if(queuedCohortEntries.size() < queueCapacity) {
98 queuedCohortEntries.offer(cohortEntry);
101 cohortCache.remove(cohortEntry.getTransactionID());
103 RuntimeException ex = new RuntimeException(
104 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
105 " capacity %d has been reached.",
106 name, cohortEntry.getTransactionID(), queueCapacity));
107 log.error(ex.getMessage());
108 sender.tell(new Status.Failure(ex), shard.self());
114 * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
115 * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
117 * @param ready the ForwardedReadyTransaction message to process
118 * @param sender the sender of the message
119 * @param shard the transaction's shard actor
121 void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
122 log.debug("{}: Readying transaction {}, client version {}", name,
123 ready.getTransactionID(), ready.getTxnClientVersion());
125 CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
126 (MutableCompositeModification) ready.getModification());
127 cohortCache.put(ready.getTransactionID(), cohortEntry);
129 if(!queueCohortEntry(cohortEntry, sender, shard)) {
133 if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
134 // Return our actor path as we'll handle the three phase commit except if the Tx client
135 // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
136 // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
137 // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
138 ActorRef replyActorPath = shard.self();
139 if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
140 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
141 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
142 ready.getTransactionID()));
145 ReadyTransactionReply readyTransactionReply =
146 new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
147 ready.getTxnClientVersion());
148 sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
149 readyTransactionReply, shard.self());
151 if(ready.isDoImmediateCommit()) {
152 cohortEntry.setDoImmediateCommit(true);
153 cohortEntry.setReplySender(sender);
154 cohortEntry.setShard(shard);
155 handleCanCommit(cohortEntry);
157 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
158 // front-end so send back a ReadyTransactionReply with our actor path.
159 sender.tell(readyTransactionReply(shard), shard.self());
165 * This method handles a BatchedModifications message for a transaction being prepared directly on the
166 * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
167 * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
168 * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
170 * @param batched the BatchedModifications message to process
171 * @param sender the sender of the message
172 * @param shard the transaction's shard actor
174 void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
175 CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
176 if(cohortEntry == null) {
177 cohortEntry = new CohortEntry(batched.getTransactionID(),
178 dataTree.newReadWriteTransaction(batched.getTransactionID(),
179 batched.getTransactionChainID()));
180 cohortCache.put(batched.getTransactionID(), cohortEntry);
183 if(log.isDebugEnabled()) {
184 log.debug("{}: Applying {} batched modifications for Tx {}", name,
185 batched.getModifications().size(), batched.getTransactionID());
188 cohortEntry.applyModifications(batched.getModifications());
190 if(batched.isReady()) {
191 if(cohortEntry.getLastBatchedModificationsException() != null) {
192 cohortCache.remove(cohortEntry.getTransactionID());
193 throw cohortEntry.getLastBatchedModificationsException();
196 if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
197 cohortCache.remove(cohortEntry.getTransactionID());
198 throw new IllegalStateException(String.format(
199 "The total number of batched messages received %d does not match the number sent %d",
200 cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
203 if(!queueCohortEntry(cohortEntry, sender, shard)) {
207 if(log.isDebugEnabled()) {
208 log.debug("{}: Readying Tx {}, client version {}", name,
209 batched.getTransactionID(), batched.getVersion());
212 cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
214 if(batched.isDoCommitOnReady()) {
215 cohortEntry.setReplySender(sender);
216 cohortEntry.setShard(shard);
217 handleCanCommit(cohortEntry);
219 sender.tell(readyTransactionReply(shard), shard.self());
222 sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
227 * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
228 * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
230 * @param message the ReadyLocalTransaction message to process
231 * @param sender the sender of the message
232 * @param shard the transaction's shard actor
234 void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
235 final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
236 message.getTransactionID());
237 final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
238 cohortCache.put(message.getTransactionID(), cohortEntry);
239 cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
241 if(!queueCohortEntry(cohortEntry, sender, shard)) {
245 log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
247 if (message.isDoCommitOnReady()) {
248 cohortEntry.setReplySender(sender);
249 cohortEntry.setShard(shard);
250 handleCanCommit(cohortEntry);
252 sender.tell(readyTransactionReply(shard), shard.self());
256 private void handleCanCommit(CohortEntry cohortEntry) {
257 String transactionID = cohortEntry.getTransactionID();
259 cohortEntry.updateLastAccessTime();
261 if(currentCohortEntry != null) {
262 // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
263 // queue and will get processed after all prior entries complete.
265 if(log.isDebugEnabled()) {
266 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
267 name, currentCohortEntry.getTransactionID(), transactionID);
273 // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
274 // it the current entry and proceed with canCommit.
275 // Purposely checking reference equality here.
276 if(queuedCohortEntries.peek() == cohortEntry) {
277 currentCohortEntry = queuedCohortEntries.poll();
278 doCanCommit(currentCohortEntry);
280 if(log.isDebugEnabled()) {
281 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
282 name, queuedCohortEntries.peek().getTransactionID(), transactionID);
288 * This method handles the canCommit phase for a transaction.
290 * @param transactionID the ID of the transaction to canCommit
291 * @param sender the actor to which to send the response
292 * @param shard the transaction's shard actor
294 void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
295 // Lookup the cohort entry that was cached previously (or should have been) by
296 // transactionReady (via the ForwardedReadyTransaction message).
297 final CohortEntry cohortEntry = cohortCache.get(transactionID);
298 if(cohortEntry == null) {
299 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
300 // between canCommit and ready and the entry was expired from the cache.
301 IllegalStateException ex = new IllegalStateException(
302 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
303 log.error(ex.getMessage());
304 sender.tell(new Status.Failure(ex), shard.self());
308 cohortEntry.setReplySender(sender);
309 cohortEntry.setShard(shard);
311 handleCanCommit(cohortEntry);
314 private void doCanCommit(final CohortEntry cohortEntry) {
315 boolean canCommit = false;
317 // We block on the future here so we don't have to worry about possibly accessing our
318 // state on a different thread outside of our dispatcher. Also, the data store
319 // currently uses a same thread executor anyway.
320 canCommit = cohortEntry.getCohort().canCommit().get();
322 log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
324 if(cohortEntry.isDoImmediateCommit()) {
326 doCommit(cohortEntry);
328 cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
329 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
332 cohortEntry.getReplySender().tell(
333 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
334 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
336 } catch (Exception e) {
337 log.debug("{}: An exception occurred during canCommit", name, e);
339 Throwable failure = e;
340 if(e instanceof ExecutionException) {
341 failure = e.getCause();
344 cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
347 // Remove the entry from the cache now.
348 currentTransactionComplete(cohortEntry.getTransactionID(), true);
353 private boolean doCommit(CohortEntry cohortEntry) {
354 log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
356 boolean success = false;
358 // We perform the preCommit phase here atomically with the commit phase. This is an
359 // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
360 // coordination of preCommit across shards in case of failure but preCommit should not
361 // normally fail since we ensure only one concurrent 3-phase commit.
364 // We block on the future here so we don't have to worry about possibly accessing our
365 // state on a different thread outside of our dispatcher. Also, the data store
366 // currently uses a same thread executor anyway.
367 cohortEntry.getCohort().preCommit().get();
369 cohortEntry.getShard().continueCommit(cohortEntry);
371 cohortEntry.updateLastAccessTime();
374 } catch (Exception e) {
375 log.error("{} An exception occurred while preCommitting transaction {}",
376 name, cohortEntry.getTransactionID(), e);
377 cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
379 currentTransactionComplete(cohortEntry.getTransactionID(), true);
386 * This method handles the preCommit and commit phases for a transaction.
388 * @param transactionID the ID of the transaction to commit
389 * @param sender the actor to which to send the response
390 * @param shard the transaction's shard actor
391 * @return true if the transaction was successfully prepared, false otherwise.
393 boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
394 // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
396 final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
397 if(cohortEntry == null) {
398 // We're not the current Tx - the Tx was likely expired b/c it took too long in
399 // between the canCommit and commit messages.
400 IllegalStateException ex = new IllegalStateException(
401 String.format("%s: Cannot commit transaction %s - it is not the current transaction",
402 name, transactionID));
403 log.error(ex.getMessage());
404 sender.tell(new akka.actor.Status.Failure(ex), shard.self());
408 cohortEntry.setReplySender(sender);
409 return doCommit(cohortEntry);
413 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
414 * matches the current entry.
416 * @param transactionID the ID of the transaction
417 * @return the current CohortEntry or null if the given transaction ID does not match the
420 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
421 if(isCurrentTransaction(transactionID)) {
422 return currentCohortEntry;
428 public CohortEntry getCurrentCohortEntry() {
429 return currentCohortEntry;
432 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
433 return cohortCache.remove(transactionID);
436 public boolean isCurrentTransaction(String transactionID) {
437 return currentCohortEntry != null &&
438 currentCohortEntry.getTransactionID().equals(transactionID);
442 * This method is called when a transaction is complete, successful or not. If the given
443 * given transaction ID matches the current in-progress transaction, the next cohort entry,
444 * if any, is dequeued and processed.
446 * @param transactionID the ID of the completed transaction
447 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
450 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
451 if(removeCohortEntry) {
452 cohortCache.remove(transactionID);
455 if(isCurrentTransaction(transactionID)) {
456 currentCohortEntry = null;
458 log.debug("{}: currentTransactionComplete: {}", name, transactionID);
460 maybeProcessNextCohortEntry();
464 private void maybeProcessNextCohortEntry() {
465 // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
466 // clean out expired entries.
467 Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
468 while(iter.hasNext()) {
469 CohortEntry next = iter.next();
470 if(next.isReadyToCommit()) {
471 if(currentCohortEntry == null) {
472 if(log.isDebugEnabled()) {
473 log.debug("{}: Next entry to canCommit {}", name, next);
477 currentCohortEntry = next;
478 currentCohortEntry.updateLastAccessTime();
479 doCanCommit(currentCohortEntry);
483 } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
484 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
485 name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
488 cohortCache.remove(next.getTransactionID());
495 void cleanupExpiredCohortEntries() {
496 maybeProcessNextCohortEntry();
500 void setCohortDecorator(CohortDecorator cohortDecorator) {
501 this.cohortDecorator = cohortDecorator;
504 static class CohortEntry {
505 private final String transactionID;
506 private ShardDataTreeCohort cohort;
507 private final ReadWriteShardDataTreeTransaction transaction;
508 private RuntimeException lastBatchedModificationsException;
509 private ActorRef replySender;
511 private boolean doImmediateCommit;
512 private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
513 private int totalBatchedModificationsReceived;
515 CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
516 this.transaction = Preconditions.checkNotNull(transaction);
517 this.transactionID = transactionID;
520 CohortEntry(String transactionID, ShardDataTreeCohort cohort,
521 MutableCompositeModification compositeModification) {
522 this.transactionID = transactionID;
523 this.cohort = cohort;
524 this.transaction = null;
527 CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
528 this.transactionID = transactionID;
529 this.cohort = cohort;
530 this.transaction = null;
533 void updateLastAccessTime() {
534 lastAccessTimer.reset();
535 lastAccessTimer.start();
538 String getTransactionID() {
539 return transactionID;
542 ShardDataTreeCohort getCohort() {
546 int getTotalBatchedModificationsReceived() {
547 return totalBatchedModificationsReceived;
550 RuntimeException getLastBatchedModificationsException() {
551 return lastBatchedModificationsException;
554 void applyModifications(Iterable<Modification> modifications) {
555 totalBatchedModificationsReceived++;
556 if(lastBatchedModificationsException == null) {
557 for (Modification modification : modifications) {
559 modification.apply(transaction.getSnapshot());
560 } catch (RuntimeException e) {
561 lastBatchedModificationsException = e;
568 void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
569 Preconditions.checkState(cohort == null, "cohort was already set");
571 setDoImmediateCommit(doImmediateCommit);
573 cohort = transaction.ready();
575 if(cohortDecorator != null) {
576 // Call the hook for unit tests.
577 cohort = cohortDecorator.decorate(transactionID, cohort);
581 boolean isReadyToCommit() {
582 return replySender != null;
585 boolean isExpired(long expireTimeInMillis) {
586 return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
589 boolean isDoImmediateCommit() {
590 return doImmediateCommit;
593 void setDoImmediateCommit(boolean doImmediateCommit) {
594 this.doImmediateCommit = doImmediateCommit;
597 ActorRef getReplySender() {
601 void setReplySender(ActorRef replySender) {
602 this.replySender = replySender;
609 void setShard(Shard shard) {
614 public String toString() {
615 StringBuilder builder = new StringBuilder();
616 builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
617 .append(doImmediateCommit).append("]");
618 return builder.toString();