import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
messageRetrySupport.addMessageToRetry(batched, getSender(),
"Could not commit transaction " + batched.getTransactionID());
} else {
- // TODO: what if this is not the first batch and leadership changed in between batched messages?
- // We could check if the commitCoordinator already has a cached entry and forward all the previous
- // batched modifications.
- LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
- leader.forward(batched, getContext());
+ // If this is not the first batch and leadership changed in between batched messages,
+ // we need to reconstruct previous BatchedModifications from the transaction
+ // DataTreeModification, honoring the max batched modification count, and forward all the
+ // previous BatchedModifications to the new leader.
+ Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+ batched, datastoreContext.getShardBatchedModificationCount());
+
+ LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+ newModifications.size(), leader);
+
+ for(BatchedModifications bm: newModifications) {
+ leader.forward(bm, getContext());
+ }
}
}
}