- commitCoordinator.handleCanCommit(canCommit, getSender(), self());
- }
-
- private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
- ready.getTransactionID(), ready.getTxnClientVersion());
-
- // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
- // commitCoordinator in preparation for the subsequent three phase commit initiated by
- // the front-end.
- commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- ready.getModification());
-
- // Return our actor path as we'll handle the three phase commit, except if the Tx client
- // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
- // node. In that case, the subsequent 3-phase commit messages won't contain the
- // transactionId so to maintain backwards compatibility, we create a separate cohort actor
- // to provide the compatible behavior.
- ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ }
+
+ private void noLeaderError(Object message) {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+
+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modifications transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ commitCoordinator.handleBatchedModifications(batched, getSender(), this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ noLeaderError(batched);
+ }