//
if(isLeader()) {
try {
- BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
- sender().tell(reply, self());
+ boolean ready = commitCoordinator.handleTransactionModifications(batched);
+ if(ready) {
+ sender().tell(READY_TRANSACTION_REPLY, self());
+ } else {
+ sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+ }
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ ActorRef replyActorPath = getSelf();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
-
+ readyTransactionReply, getSelf());
} else {
-
- getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
- READY_TRANSACTION_REPLY, getSelf());
+ getSender().tell(READY_TRANSACTION_REPLY, getSelf());
}
}