Bug 2847: Recovery of a large journal runs out of memory 40/16540/6
authorTom Pantelis <tpanteli@brocade.com>
Fri, 13 Mar 2015 23:57:47 +0000 (19:57 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 25 Mar 2015 22:47:32 +0000 (18:47 -0400)
Changed the ShardRecoveryCoordinator to commit each log entries batch
immediately instead of using an executor to prepare write transaction in
parallel and then commit them all on recovery complete. This resulted in
a lot of memory overhead for little to no gain. I also changed it to
cache the serialized ModificationPayload instances instead of
immediately de-serializing - this further reduces the memory footprint
as the serialized instances are much smaller

As a result, all of the recovery code is now in the
ShardRecoveryCoordinator - Shard is essentially a pass through.

I also reduced the shardJournalRecoveryLogBatchSize to 1000 to further
reduce the memory footprint.

Change-Id: I3aaabe52781bc0db14975e0a292ef9fd18aa3d7c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index 7df398355e0ab0cb2c9e96329652c69261910857..cfbf9450aa2b77661f21bd00904a995b48c9c3d7 100644 (file)
@@ -35,8 +35,8 @@ operational.persistent=false
 # failing an operation (eg transaction create and change listener registration).
 #shard-initialization-timeout-in-seconds=300
 
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
 
 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
 #shard-snapshot-batch-count=20000
index a30b6f7516981411e589577f3fda9a9a0f9bc887..8e00a1389ca6a057bef39292ffcc0b04958be794 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -140,7 +139,6 @@ public class Shard extends RaftActor {
      * Coordinates persistence recovery on startup.
      */
     private ShardRecoveryCoordinator recoveryCoordinator;
-    private List<Object> currentLogRecoveryBatch;
 
     private final DOMTransactionFactory transactionFactory;
 
@@ -190,6 +188,8 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     private void setTransactionCommitTimeout() {
@@ -713,81 +713,27 @@ public class Shard extends RaftActor {
     @Override
     protected
     void startLogRecoveryBatch(final int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
-        }
+        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
     }
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if(data instanceof ModificationPayload) {
-            try {
-                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
-            } catch (ClassNotFoundException | IOException e) {
-                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
-            }
-        } else if (data instanceof CompositeModificationPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
-        } else if (data instanceof CompositeModificationByteStringPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
-        } else {
-            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
-        }
+        recoveryCoordinator.appendRecoveredLogPayload(data);
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
-        }
+        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
-                    currentLogRecoveryBatch.size());
-        }
+        recoveryCoordinator.applyCurrentLogRecoveryBatch();
     }
 
     @Override
     protected void onRecoveryComplete() {
-        if(recoveryCoordinator != null) {
-            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
-            }
-
-            for(DOMStoreWriteTransaction tx: txList) {
-                try {
-                    syncCommitTransaction(tx);
-                    shardMBean.incrementCommittedTransactionCount();
-                } catch (InterruptedException | ExecutionException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    LOG.error("{}: Failed to commit", persistenceId(), e);
-                }
-            }
-        }
-
         recoveryCoordinator = null;
-        currentLogRecoveryBatch = null;
 
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
index 50528575e77123ce433bf0b4e6ab73f0077c39c5..7e547d7257dc495dc34a796d1c7b7a0944cb0e76 100644 (file)
@@ -8,19 +8,19 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
 /**
@@ -34,115 +34,86 @@ import org.slf4j.Logger;
  */
 class ShardRecoveryCoordinator {
 
-    private static final int TIME_OUT = 10;
-
-    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
-    private final SchemaContext schemaContext;
+    private final InMemoryDOMDataStore store;
+    private List<ModificationPayload> currentLogRecoveryBatch;
     private final String shardName;
-    private final ExecutorService executor;
     private final Logger log;
-    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
-            String name) {
-        this.schemaContext = schemaContext;
+    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+        this.store = store;
         this.shardName = shardName;
         this.log = log;
-        this.name = name;
-
-        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setDaemon(true)
-                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
     }
 
-    /**
-     * Submits a batch of journal log entries.
-     *
-     * @param logEntries the serialized journal log entries
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
-    }
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
-    /**
-     * Submits a snapshot.
-     *
-     * @param snapshotBytes the serialized snapshot
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
+        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
     }
 
-    Collection<DOMStoreWriteTransaction> getTransactions() {
-        // Shutdown the executor and wait for task completion.
-        executor.shutdown();
-
+    void appendRecoveredLogPayload(Payload payload) {
         try {
-            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
-                return resultingTxList;
+            if(payload instanceof ModificationPayload) {
+                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            } else if (payload instanceof CompositeModificationPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationPayload) payload).getModification())));
+            } else if (payload instanceof CompositeModificationByteStringPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification())));
             } else {
-                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+        } catch (IOException e) {
+            log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
 
-        return Collections.emptyList();
     }
 
-    private static abstract class ShardRecoveryTask implements Runnable {
-
-        final DOMStoreWriteTransaction resultingTx;
-
-        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
-            this.resultingTx = resultingTx;
+    private void commitTransaction(DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        try {
+            commitCohort.preCommit().get();
+            commitCohort.commit().get();
+        } catch (Exception e) {
+            log.error("{}: Failed to commit Tx on recovery", shardName, e);
         }
     }
 
-    private class LogRecoveryTask extends ShardRecoveryTask {
-
-        private final List<Object> logEntries;
-
-        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.logEntries = logEntries;
-        }
-
-        @Override
-        public void run() {
-            for(int i = 0; i < logEntries.size(); i++) {
-                MutableCompositeModification.fromSerializable(
-                        logEntries.get(i)).apply(resultingTx);
-                // Null out to GC quicker.
-                logEntries.set(i, null);
+    /**
+     * Applies the current batched log entries to the data store.
+     */
+    void applyCurrentLogRecoveryBatch() {
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+        for(ModificationPayload payload: currentLogRecoveryBatch) {
+            try {
+                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+            } catch (Exception e) {
+                log.error("{}: Error extracting ModificationPayload", shardName, e);
             }
         }
-    }
 
-    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+        commitTransaction(writeTx);
+
+        currentLogRecoveryBatch = null;
+    }
 
-        private final byte[] snapshotBytes;
+    /**
+     * Applies a recovered snapshot to the data store.
+     *
+     * @param snapshotBytes the serialized snapshot
+     */
+    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+        log.debug("{}: Applyng recovered sbapshot", shardName);
 
-        SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.snapshotBytes = snapshotBytes;
-        }
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
 
-        @Override
-        public void run() {
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
-            // delete everything first
-            resultingTx.delete(YangInstanceIdentifier.builder().build());
+        writeTx.write(YangInstanceIdentifier.builder().build(), node);
 
-            // Add everything from the remote node back
-            resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-        }
+        commitTransaction(writeTx);
     }
 }
index b775cf0a9914be9ed37f8595bbafeb662e57321e..dc83af9a756374694e6203c61eebc49d72e214fc 100644 (file)
@@ -124,7 +124,7 @@ module distributed-datastore-provider {
          }
 
          leaf shard-journal-recovery-log-batch-size {
-            default 5000;
+            default 1000;
             type non-zero-uint32-type;
             description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
          }