- // TODO: what if this is not the first batch and leadership changed in between batched messages?
- // We could check if the commitCoordinator already has a cached entry and forward all the previous
- // batched modifications.
- LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
- leader.forward(batched, getContext());
+ // If this is not the first batch and leadership changed in between batched messages,
+ // we need to reconstruct previous BatchedModifications from the transaction
+ // DataTreeModification, honoring the max batched modification count, and forward all the
+ // previous BatchedModifications to the new leader.
+ Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+ batched, datastoreContext.getShardBatchedModificationCount());
+
+ LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+ newModifications.size(), leader);
+
+ for(BatchedModifications bm: newModifications) {
+ leader.forward(bm, getContext());
+ }