ready.getTransactionID(), ready.getTxnClientVersion());
ShardDataTreeCohort cohort = ready.getTransaction().ready();
ready.getTransactionID(), ready.getTxnClientVersion());
ShardDataTreeCohort cohort = ready.getTransaction().ready();
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
cohortCache.put(ready.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(),
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID(),
void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ DataStoreVersions.CURRENT_VERSION);
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
cohortCache.put(message.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
- canCommit ? CanCommitTransactionReply.YES.toSerializable() :
- CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+ canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+ CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
private boolean aborted;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private int totalBatchedModificationsReceived;
private boolean aborted;