- void appendRecoveredLogPayload(Payload payload) {
- try {
- if(payload instanceof ModificationPayload) {
- currentLogRecoveryBatch.add((ModificationPayload) payload);
- } else if (payload instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationPayload) payload).getModification())));
- } else if (payload instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationByteStringPayload) payload).getModification())));
- } else {
- log.error("{}: Unknown payload {} received during recovery", shardName, payload);
- }
- } catch (IOException e) {
- log.error("{}: Error extracting ModificationPayload", shardName, e);
- }
-
+ @Override
+ public void startLogRecoveryBatch(final int maxBatchSize) {
+ log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+ open = true;