X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=8e00a1389ca6a057bef39292ffcc0b04958be794;hp=a30b6f7516981411e589577f3fda9a9a0f9bc887;hb=07267ef7e2e6d46c3a514a978a8dbb266966176c;hpb=e6e3ad00c95d72f731142e649f198c5ded9dde64 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a30b6f7516..8e00a1389c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -140,7 +139,6 @@ public class Shard extends RaftActor { * Coordinates persistence recovery on startup. */ private ShardRecoveryCoordinator recoveryCoordinator; - private List currentLogRecoveryBatch; private final DOMTransactionFactory transactionFactory; @@ -190,6 +188,8 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + + recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); } private void setTransactionCommitTimeout() { @@ -713,81 +713,27 @@ public class Shard extends RaftActor { @Override protected void startLogRecoveryBatch(final int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize); - } + recoveryCoordinator.startLogRecoveryBatch(maxBatchSize); } @Override protected void appendRecoveredLogEntry(final Payload data) { - if(data instanceof ModificationPayload) { - try { - currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); - } catch (ClassNotFoundException | IOException e) { - LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); - } - } else if (data instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); - } else if (data instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); - } else { - LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data); - } + recoveryCoordinator.appendRecoveredLogPayload(data); } @Override protected void applyRecoverySnapshot(final byte[] snapshotBytes) { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted recovery sbapshot", persistenceId()); - } + recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes); } @Override protected void applyCurrentLogRecoveryBatch() { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(), - currentLogRecoveryBatch.size()); - } + recoveryCoordinator.applyCurrentLogRecoveryBatch(); } @Override protected void onRecoveryComplete() { - if(recoveryCoordinator != null) { - Collection txList = recoveryCoordinator.getTransactions(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size()); - } - - for(DOMStoreWriteTransaction tx: txList) { - try { - syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - } - recoveryCoordinator = null; - currentLogRecoveryBatch = null; //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf());