import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
ready.getTransactionID(), ready.getTxnClientVersion());
ShardDataTreeCohort cohort = ready.getTransaction().ready();
- CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+ CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
}
- if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
- // Return our actor path as we'll handle the three phase commit except if the Tx client
- // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
- // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
- // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
- ActorRef replyActorPath = shard.self();
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
- replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
- }
-
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
- ready.getTxnClientVersion());
- sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, shard.self());
+ if(ready.isDoImmediateCommit()) {
+ cohortEntry.setDoImmediateCommit(true);
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
} else {
- if(ready.isDoImmediateCommit()) {
- cohortEntry.setDoImmediateCommit(true);
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
- handleCanCommit(cohortEntry);
- } else {
- // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
- // front-end so send back a ReadyTransactionReply with our actor path.
- sender.tell(readyTransactionReply(shard), shard.self());
- }
+ // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+ // front-end so send back a ReadyTransactionReply with our actor path.
+ sender.tell(readyTransactionReply(shard), shard.self());
}
}
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(),
- batched.getTransactionChainID()));
+ batched.getTransactionChainID()), batched.getVersion());
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
}
}
+ Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+ final int maxModificationsPerBatch) {
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+ if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+ return Collections.singletonList(from);
+ }
+
+ cohortEntry.applyModifications(from.getModifications());
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(from.getTransactionID(),
+ from.getVersion(), from.getTransactionChainID()));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(from.isDoCommitOnReady());
+ last.setReady(from.isReady());
+ last.setTotalMessagesSent(newModifications.size());
+ return newModifications;
+ }
+
private void handleCanCommit(CohortEntry cohortEntry) {
String transactionID = cohortEntry.getTransactionID();
"Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
}
} else {
+ // FIXME - use caller's version
cohortEntry.getReplySender().tell(
- canCommit ? CanCommitTransactionReply.YES.toSerializable() :
- CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+ canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+ CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
}
} catch (Exception e) {
log.debug("{}: An exception occurred during canCommit", name, e);
shard.getShardMBean().incrementAbortTransactionsCount();
if(sender != null) {
- sender.tell(new AbortTransactionReply().toSerializable(), self);
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
} catch (Exception e) {
log.error("{}: An exception happened during abort", name, e);
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
private boolean aborted;
+ private final short clientVersion;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
+ this.clientVersion = clientVersion;
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
this.transactionID = transactionID;
this.cohort = cohort;
this.transaction = null;
+ this.clientVersion = clientVersion;
}
void updateLastAccessTime() {
return transactionID;
}
+ short getClientVersion() {
+ return clientVersion;
+ }
+
DataTreeCandidate getCandidate() {
return cohort.getCandidate();
}
+ ReadWriteShardDataTreeTransaction getTransaction() {
+ return transaction;
+ }
+
int getTotalBatchedModificationsReceived() {
return totalBatchedModificationsReceived;
}