From 07267ef7e2e6d46c3a514a978a8dbb266966176c Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 13 Mar 2015 19:57:47 -0400 Subject: [PATCH] Bug 2847: Recovery of a large journal runs out of memory Changed the ShardRecoveryCoordinator to commit each log entries batch immediately instead of using an executor to prepare write transaction in parallel and then commit them all on recovery complete. This resulted in a lot of memory overhead for little to no gain. I also changed it to cache the serialized ModificationPayload instances instead of immediately de-serializing - this further reduces the memory footprint as the serialized instances are much smaller As a result, all of the recovery code is now in the ShardRecoveryCoordinator - Shard is essentially a pass through. I also reduced the shardJournalRecoveryLogBatchSize to 1000 to further reduce the memory footprint. Change-Id: I3aaabe52781bc0db14975e0a292ef9fd18aa3d7c Signed-off-by: Tom Pantelis --- .../src/main/resources/initial/datastore.cfg | 4 +- .../controller/cluster/datastore/Shard.java | 66 +------- .../datastore/ShardRecoveryCoordinator.java | 149 +++++++----------- .../yang/distributed-datastore-provider.yang | 2 +- 4 files changed, 69 insertions(+), 152 deletions(-) diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 7df398355e..cfbf9450aa 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -35,8 +35,8 @@ operational.persistent=false # failing an operation (eg transaction create and change listener registration). #shard-initialization-timeout-in-seconds=300 -# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. -#shard-journal-recovery-log-batch-size=5000 +# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store. +#shard-journal-recovery-log-batch-size=1000 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken. #shard-snapshot-batch-count=20000 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a30b6f7516..8e00a1389c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -140,7 +139,6 @@ public class Shard extends RaftActor { * Coordinates persistence recovery on startup. */ private ShardRecoveryCoordinator recoveryCoordinator; - private List currentLogRecoveryBatch; private final DOMTransactionFactory transactionFactory; @@ -190,6 +188,8 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + + recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); } private void setTransactionCommitTimeout() { @@ -713,81 +713,27 @@ public class Shard extends RaftActor { @Override protected void startLogRecoveryBatch(final int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize); - } + recoveryCoordinator.startLogRecoveryBatch(maxBatchSize); } @Override protected void appendRecoveredLogEntry(final Payload data) { - if(data instanceof ModificationPayload) { - try { - currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); - } catch (ClassNotFoundException | IOException e) { - LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); - } - } else if (data instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); - } else if (data instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); - } else { - LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data); - } + recoveryCoordinator.appendRecoveredLogPayload(data); } @Override protected void applyRecoverySnapshot(final byte[] snapshotBytes) { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted recovery sbapshot", persistenceId()); - } + recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes); } @Override protected void applyCurrentLogRecoveryBatch() { - if(recoveryCoordinator == null) { - recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext, - LOG, name.toString()); - } - - recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(), - currentLogRecoveryBatch.size()); - } + recoveryCoordinator.applyCurrentLogRecoveryBatch(); } @Override protected void onRecoveryComplete() { - if(recoveryCoordinator != null) { - Collection txList = recoveryCoordinator.getTransactions(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size()); - } - - for(DOMStoreWriteTransaction tx: txList) { - try { - syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - } - recoveryCoordinator = null; - currentLogRecoveryBatch = null; //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 50528575e7..7e547d7257 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -8,19 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.Collection; -import java.util.Collections; +import java.io.IOException; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; /** @@ -34,115 +34,86 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator { - private static final int TIME_OUT = 10; - - private final List resultingTxList = Lists.newArrayList(); - private final SchemaContext schemaContext; + private final InMemoryDOMDataStore store; + private List currentLogRecoveryBatch; private final String shardName; - private final ExecutorService executor; private final Logger log; - private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log, - String name) { - this.schemaContext = schemaContext; + ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) { + this.store = store; this.shardName = shardName; this.log = log; - this.name = name; - - executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ShardRecovery-" + shardName + "-%d").build()); } - /** - * Submits a batch of journal log entries. - * - * @param logEntries the serialized journal log entries - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(List logEntries, DOMStoreWriteTransaction resultingTx) { - LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); - } + void startLogRecoveryBatch(int maxBatchSize) { + currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - /** - * Submits a snapshot. - * - * @param snapshotBytes the serialized snapshot - * @param resultingTx the write Tx to which to apply the entries - */ - void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx); - resultingTxList.add(resultingTx); - executor.execute(task); + log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); } - Collection getTransactions() { - // Shutdown the executor and wait for task completion. - executor.shutdown(); - + void appendRecoveredLogPayload(Payload payload) { try { - if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES)) { - return resultingTxList; + if(payload instanceof ModificationPayload) { + currentLogRecoveryBatch.add((ModificationPayload) payload); + } else if (payload instanceof CompositeModificationPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()))); + } else if (payload instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()))); } else { - log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT); + log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (IOException e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } - return Collections.emptyList(); } - private static abstract class ShardRecoveryTask implements Runnable { - - final DOMStoreWriteTransaction resultingTx; - - ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) { - this.resultingTx = resultingTx; + private void commitTransaction(DOMStoreWriteTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + try { + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } catch (Exception e) { + log.error("{}: Failed to commit Tx on recovery", shardName, e); } } - private class LogRecoveryTask extends ShardRecoveryTask { - - private final List logEntries; - - LogRecoveryTask(List logEntries, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.logEntries = logEntries; - } - - @Override - public void run() { - for(int i = 0; i < logEntries.size(); i++) { - MutableCompositeModification.fromSerializable( - logEntries.get(i)).apply(resultingTx); - // Null out to GC quicker. - logEntries.set(i, null); + /** + * Applies the current batched log entries to the data store. + */ + void applyCurrentLogRecoveryBatch() { + log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); + + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); + for(ModificationPayload payload: currentLogRecoveryBatch) { + try { + MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx); + } catch (Exception e) { + log.error("{}: Error extracting ModificationPayload", shardName, e); } } - } - private class SnapshotRecoveryTask extends ShardRecoveryTask { + commitTransaction(writeTx); + + currentLogRecoveryBatch = null; + } - private final byte[] snapshotBytes; + /** + * Applies a recovered snapshot to the data store. + * + * @param snapshotBytes the serialized snapshot + */ + void applyRecoveredSnapshot(final byte[] snapshotBytes) { + log.debug("{}: Applyng recovered sbapshot", shardName); - SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) { - super(resultingTx); - this.snapshotBytes = snapshotBytes; - } + DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); - @Override - public void run() { - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - // delete everything first - resultingTx.delete(YangInstanceIdentifier.builder().build()); + writeTx.write(YangInstanceIdentifier.builder().build(), node); - // Add everything from the remote node back - resultingTx.write(YangInstanceIdentifier.builder().build(), node); - } + commitTransaction(writeTx); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index b775cf0a99..dc83af9a75 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -124,7 +124,7 @@ module distributed-datastore-provider { } leaf shard-journal-recovery-log-batch-size { - default 5000; + default 1000; type non-zero-uint32-type; description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store."; } -- 2.36.6