import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
-import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
-import scala.concurrent.duration.Duration;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
*
* @author Thomas Pantelis
*/
-class ShardCommitCoordinator {
+final class ShardCommitCoordinator {
// Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
public interface CohortDecorator {
- ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+ ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
}
- private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+ private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
private CohortEntry currentCohortEntry;
private Runnable runOnPendingTransactionsComplete;
-
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
-
ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
String name) {
ready.getTransactionID(), ready.getTxnClientVersion());
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
- final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
- cohortCache.put(ready.getTransactionID(), cohortEntry);
+ final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
+ schema, ready.getTxnClientVersion());
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
*
* @param batched the BatchedModifications message to process
* @param sender the sender of the message
- * @param shard the transaction's shard actor
*/
- void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
+ void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
- cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
- cohortRegistry, schema, batched.getVersion());
- cohortCache.put(batched.getTransactionID(), cohortEntry);
+ cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
+ dataTree.newReadWriteTransaction(batched.getTransactionID()),
+ cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
}
if(log.isDebugEnabled()) {
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
- SchemaContext schema) {
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
- DataStoreVersions.CURRENT_VERSION);
- cohortCache.put(message.getTransactionID(), cohortEntry);
+ final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
+ dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
if(!queueCohortEntry(cohortEntry, sender, shard)) {
protected BatchedModifications getModifications() {
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(from.getTransactionID(),
- from.getVersion(), from.getTransactionChainID()));
+ newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
}
return newModifications.getLast();
}
private void handleCanCommit(CohortEntry cohortEntry) {
- String transactionID = cohortEntry.getTransactionID();
-
cohortEntry.updateLastAccessTime();
if(currentCohortEntry != null) {
if(log.isDebugEnabled()) {
log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
- name, currentCohortEntry.getTransactionID(), transactionID);
+ name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
}
return;
if(log.isDebugEnabled()) {
log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
- transactionID);
+ cohortEntry.getTransactionID());
}
}
}
* @param sender the actor to which to send the response
* @param shard the transaction's shard actor
*/
- void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+ void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
// Lookup the cohort entry that was cached previously (or should have been) by
// transactionReady (via the ForwardedReadyTransaction message).
final CohortEntry cohortEntry = cohortCache.get(transactionID);
* @param shard the transaction's shard actor
* @return true if the transaction was successfully prepared, false otherwise.
*/
- boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+ boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
return doCommit(cohortEntry);
}
- void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+ void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
- cohortEntry.getClientVersion(), ""));
+ cohortEntry.getClientVersion()));
}
return newModifications.getLast();
last.setTotalMessagesSent(newModifications.size());
messages.addAll(newModifications);
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion()));
}
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion()));
}
* @return the current CohortEntry or null if the given transaction ID does not match the
* current entry.
*/
- CohortEntry getCohortEntryIfCurrent(String transactionID) {
+ CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
if(isCurrentTransaction(transactionID)) {
return currentCohortEntry;
}
return currentCohortEntry;
}
- CohortEntry getAndRemoveCohortEntry(String transactionID) {
+ CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
return cohortCache.remove(transactionID);
}
- boolean isCurrentTransaction(String transactionID) {
+ boolean isCurrentTransaction(Identifier transactionID) {
return currentCohortEntry != null &&
currentCohortEntry.getTransactionID().equals(transactionID);
}
* @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
* the cache.
*/
- void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+ void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
if(removeCohortEntry) {
cohortCache.remove(transactionID);
}
void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
-
- static class CohortEntry {
- enum State {
- PENDING,
- CAN_COMMITTED,
- PRE_COMMITTED,
- COMMITTED,
- ABORTED
- }
-
- private final String transactionID;
- private ShardDataTreeCohort cohort;
- private final ReadWriteShardDataTreeTransaction transaction;
- private RuntimeException lastBatchedModificationsException;
- private ActorRef replySender;
- private Shard shard;
- private boolean doImmediateCommit;
- private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
- private int totalBatchedModificationsReceived;
- private State state = State.PENDING;
- private final short clientVersion;
- private final CompositeDataTreeCohort userCohorts;
-
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
- DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = transactionID;
- this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
- }
-
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
- SchemaContext schema, short clientVersion) {
- this.transactionID = transactionID;
- this.cohort = cohort;
- this.transaction = null;
- this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
- }
-
- void updateLastAccessTime() {
- lastAccessTimer.reset();
- lastAccessTimer.start();
- }
-
- String getTransactionID() {
- return transactionID;
- }
-
- short getClientVersion() {
- return clientVersion;
- }
-
- State getState() {
- return state;
- }
-
- DataTreeCandidate getCandidate() {
- return cohort.getCandidate();
- }
-
- DataTreeModification getDataTreeModification() {
- return cohort.getDataTreeModification();
- }
-
- ReadWriteShardDataTreeTransaction getTransaction() {
- return transaction;
- }
-
- int getTotalBatchedModificationsReceived() {
- return totalBatchedModificationsReceived;
- }
-
- RuntimeException getLastBatchedModificationsException() {
- return lastBatchedModificationsException;
- }
-
- void applyModifications(Iterable<Modification> modifications) {
- totalBatchedModificationsReceived++;
- if(lastBatchedModificationsException == null) {
- for (Modification modification : modifications) {
- try {
- modification.apply(transaction.getSnapshot());
- } catch (RuntimeException e) {
- lastBatchedModificationsException = e;
- throw e;
- }
- }
- }
- }
-
- boolean canCommit() throws InterruptedException, ExecutionException {
- state = State.CAN_COMMITTED;
-
- // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
- // about possibly accessing our state on a different thread outside of our dispatcher.
- // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
- // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
- // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
- return cohort.canCommit().get();
- }
-
-
-
- void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.PRE_COMMITTED;
- cohort.preCommit().get();
- userCohorts.canCommit(cohort.getCandidate());
- userCohorts.preCommit();
- }
-
- void commit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.COMMITTED;
- cohort.commit().get();
- userCohorts.commit();
- }
-
- void abort() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.ABORTED;
- cohort.abort().get();
- userCohorts.abort();
- }
-
- void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
- Preconditions.checkState(cohort == null, "cohort was already set");
-
- setDoImmediateCommit(doImmediateCommit);
-
- cohort = transaction.ready();
-
- if(cohortDecorator != null) {
- // Call the hook for unit tests.
- cohort = cohortDecorator.decorate(transactionID, cohort);
- }
- }
-
- boolean isReadyToCommit() {
- return replySender != null;
- }
-
- boolean isExpired(long expireTimeInMillis) {
- return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
- }
-
- boolean isDoImmediateCommit() {
- return doImmediateCommit;
- }
-
- void setDoImmediateCommit(boolean doImmediateCommit) {
- this.doImmediateCommit = doImmediateCommit;
- }
-
- ActorRef getReplySender() {
- return replySender;
- }
-
- void setReplySender(ActorRef replySender) {
- this.replySender = replySender;
- }
-
- Shard getShard() {
- return shard;
- }
-
- void setShard(Shard shard) {
- this.shard = shard;
- }
-
-
- boolean isAborted() {
- return state == State.ABORTED;
- }
-
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
- .append(doImmediateCommit).append("]");
- return builder.toString();
- }
- }
}