import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
* Coordinates persistence recovery on startup.
*/
private ShardRecoveryCoordinator recoveryCoordinator;
- private List<Object> currentLogRecoveryBatch;
private final DOMTransactionFactory transactionFactory;
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+ recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
@Override
protected
void startLogRecoveryBatch(final int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
- }
+ recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
}
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if(data instanceof ModificationPayload) {
- try {
- currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
- } else if (data instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
- } else if (data instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
- } else {
- LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
- }
+ recoveryCoordinator.appendRecoveredLogPayload(data);
}
@Override
protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted recovery sbapshot", persistenceId());
- }
+ recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
}
@Override
protected void applyCurrentLogRecoveryBatch() {
- if(recoveryCoordinator == null) {
- recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
- LOG, name.toString());
- }
-
- recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
- currentLogRecoveryBatch.size());
- }
+ recoveryCoordinator.applyCurrentLogRecoveryBatch();
}
@Override
protected void onRecoveryComplete() {
- if(recoveryCoordinator != null) {
- Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
- }
-
- for(DOMStoreWriteTransaction tx: txList) {
- try {
- syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
- }
-
recoveryCoordinator = null;
- currentLogRecoveryBatch = null;
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());