Merge "Remove RaftActor#trimPersistentData method"
authorMoiz Raja <moraja@cisco.com>
Fri, 27 Mar 2015 10:15:57 +0000 (10:15 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Mar 2015 10:15:57 +0000 (10:15 +0000)
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index 7df398355e0ab0cb2c9e96329652c69261910857..cfbf9450aa2b77661f21bd00904a995b48c9c3d7 100644 (file)
@@ -35,8 +35,8 @@ operational.persistent=false
 # failing an operation (eg transaction create and change listener registration).
 #shard-initialization-timeout-in-seconds=300
 
-# The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
-#shard-journal-recovery-log-batch-size=5000
+# The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
+#shard-journal-recovery-log-batch-size=1000
 
 # The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
 #shard-snapshot-batch-count=20000
index a30b6f7516981411e589577f3fda9a9a0f9bc887..8e00a1389ca6a057bef39292ffcc0b04958be794 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -140,7 +139,6 @@ public class Shard extends RaftActor {
      * Coordinates persistence recovery on startup.
      */
     private ShardRecoveryCoordinator recoveryCoordinator;
-    private List<Object> currentLogRecoveryBatch;
 
     private final DOMTransactionFactory transactionFactory;
 
@@ -190,6 +188,8 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     private void setTransactionCommitTimeout() {
@@ -713,81 +713,27 @@ public class Shard extends RaftActor {
     @Override
     protected
     void startLogRecoveryBatch(final int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
-        }
+        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
     }
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if(data instanceof ModificationPayload) {
-            try {
-                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
-            } catch (ClassNotFoundException | IOException e) {
-                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
-            }
-        } else if (data instanceof CompositeModificationPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
-        } else if (data instanceof CompositeModificationByteStringPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
-        } else {
-            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
-        }
+        recoveryCoordinator.appendRecoveredLogPayload(data);
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
-        }
+        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
-                    currentLogRecoveryBatch.size());
-        }
+        recoveryCoordinator.applyCurrentLogRecoveryBatch();
     }
 
     @Override
     protected void onRecoveryComplete() {
-        if(recoveryCoordinator != null) {
-            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
-            }
-
-            for(DOMStoreWriteTransaction tx: txList) {
-                try {
-                    syncCommitTransaction(tx);
-                    shardMBean.incrementCommittedTransactionCount();
-                } catch (InterruptedException | ExecutionException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    LOG.error("{}: Failed to commit", persistenceId(), e);
-                }
-            }
-        }
-
         recoveryCoordinator = null;
-        currentLogRecoveryBatch = null;
 
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
index 50528575e77123ce433bf0b4e6ab73f0077c39c5..7e547d7257dc495dc34a796d1c7b7a0944cb0e76 100644 (file)
@@ -8,19 +8,19 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
 /**
@@ -34,115 +34,86 @@ import org.slf4j.Logger;
  */
 class ShardRecoveryCoordinator {
 
-    private static final int TIME_OUT = 10;
-
-    private final List<DOMStoreWriteTransaction> resultingTxList = Lists.newArrayList();
-    private final SchemaContext schemaContext;
+    private final InMemoryDOMDataStore store;
+    private List<ModificationPayload> currentLogRecoveryBatch;
     private final String shardName;
-    private final ExecutorService executor;
     private final Logger log;
-    private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
-            String name) {
-        this.schemaContext = schemaContext;
+    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+        this.store = store;
         this.shardName = shardName;
         this.log = log;
-        this.name = name;
-
-        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setDaemon(true)
-                        .setNameFormat("ShardRecovery-" + shardName + "-%d").build());
     }
 
-    /**
-     * Submits a batch of journal log entries.
-     *
-     * @param logEntries the serialized journal log entries
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-        LogRecoveryTask task = new LogRecoveryTask(logEntries, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
-    }
+    void startLogRecoveryBatch(int maxBatchSize) {
+        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
-    /**
-     * Submits a snapshot.
-     *
-     * @param snapshotBytes the serialized snapshot
-     * @param resultingTx the write Tx to which to apply the entries
-     */
-    void submit(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-        SnapshotRecoveryTask task = new SnapshotRecoveryTask(snapshotBytes, resultingTx);
-        resultingTxList.add(resultingTx);
-        executor.execute(task);
+        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
     }
 
-    Collection<DOMStoreWriteTransaction> getTransactions() {
-        // Shutdown the executor and wait for task completion.
-        executor.shutdown();
-
+    void appendRecoveredLogPayload(Payload payload) {
         try {
-            if(executor.awaitTermination(TIME_OUT, TimeUnit.MINUTES))  {
-                return resultingTxList;
+            if(payload instanceof ModificationPayload) {
+                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            } else if (payload instanceof CompositeModificationPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationPayload) payload).getModification())));
+            } else if (payload instanceof CompositeModificationByteStringPayload) {
+                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification())));
             } else {
-                log.error("{}: Recovery for shard {} timed out after {} minutes", name, shardName, TIME_OUT);
+                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+        } catch (IOException e) {
+            log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
 
-        return Collections.emptyList();
     }
 
-    private static abstract class ShardRecoveryTask implements Runnable {
-
-        final DOMStoreWriteTransaction resultingTx;
-
-        ShardRecoveryTask(DOMStoreWriteTransaction resultingTx) {
-            this.resultingTx = resultingTx;
+    private void commitTransaction(DOMStoreWriteTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        try {
+            commitCohort.preCommit().get();
+            commitCohort.commit().get();
+        } catch (Exception e) {
+            log.error("{}: Failed to commit Tx on recovery", shardName, e);
         }
     }
 
-    private class LogRecoveryTask extends ShardRecoveryTask {
-
-        private final List<Object> logEntries;
-
-        LogRecoveryTask(List<Object> logEntries, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.logEntries = logEntries;
-        }
-
-        @Override
-        public void run() {
-            for(int i = 0; i < logEntries.size(); i++) {
-                MutableCompositeModification.fromSerializable(
-                        logEntries.get(i)).apply(resultingTx);
-                // Null out to GC quicker.
-                logEntries.set(i, null);
+    /**
+     * Applies the current batched log entries to the data store.
+     */
+    void applyCurrentLogRecoveryBatch() {
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+        for(ModificationPayload payload: currentLogRecoveryBatch) {
+            try {
+                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+            } catch (Exception e) {
+                log.error("{}: Error extracting ModificationPayload", shardName, e);
             }
         }
-    }
 
-    private class SnapshotRecoveryTask extends ShardRecoveryTask {
+        commitTransaction(writeTx);
+
+        currentLogRecoveryBatch = null;
+    }
 
-        private final byte[] snapshotBytes;
+    /**
+     * Applies a recovered snapshot to the data store.
+     *
+     * @param snapshotBytes the serialized snapshot
+     */
+    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+        log.debug("{}: Applyng recovered sbapshot", shardName);
 
-        SnapshotRecoveryTask(byte[] snapshotBytes, DOMStoreWriteTransaction resultingTx) {
-            super(resultingTx);
-            this.snapshotBytes = snapshotBytes;
-        }
+        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
 
-        @Override
-        public void run() {
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
-            // delete everything first
-            resultingTx.delete(YangInstanceIdentifier.builder().build());
+        writeTx.write(YangInstanceIdentifier.builder().build(), node);
 
-            // Add everything from the remote node back
-            resultingTx.write(YangInstanceIdentifier.builder().build(), node);
-        }
+        commitTransaction(writeTx);
     }
 }
index b775cf0a9914be9ed37f8595bbafeb662e57321e..dc83af9a756374694e6203c61eebc49d72e214fc 100644 (file)
@@ -124,7 +124,7 @@ module distributed-datastore-provider {
          }
 
          leaf shard-journal-recovery-log-batch-size {
-            default 5000;
+            default 1000;
             type non-zero-uint32-type;
             description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
          }