- protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
- }
-
- @Override
- protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }