Merge "IMDS: trim down commit overhead"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index f9d305001567ad4b47706169de873ffafb36fb47..797641978d2cd47cc7eed57c12e77e2334cb943c 100644 (file)
@@ -7,9 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -17,10 +15,10 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
@@ -36,52 +34,55 @@ import org.slf4j.Logger;
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
-    private final ShardDataTree store;
-    private List<ModificationPayload> currentLogRecoveryBatch;
+    private final DataTree store;
     private final String shardName;
     private final Logger log;
+    private DataTreeModification transaction;
+    private int size;
 
     ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
-        this.store = store;
+        this.store = store.getDataTree();
         this.shardName = shardName;
         this.log = log;
     }
 
     @Override
     public void startLogRecoveryBatch(int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        transaction = store.takeSnapshot().newModification();
+        size = 0;
     }
 
     @Override
     public void appendRecoveredLogEntry(Payload payload) {
         try {
-            if(payload instanceof ModificationPayload) {
-                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            if (payload instanceof DataTreeCandidatePayload) {
+                DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+                size++;
+            } else if (payload instanceof ModificationPayload) {
+                MutableCompositeModification.fromSerializable(
+                    ((ModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                    ((CompositeModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationByteStringPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationByteStringPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+                size++;
             } else {
                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (IOException e) {
+        } catch (IOException | ClassNotFoundException e) {
             log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
-
     }
 
-    private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
-        try {
-            commitCohort.preCommit().get();
-            commitCohort.commit().get();
-        } catch (Exception e) {
-            log.error("{}: Failed to commit Tx on recovery", shardName, e);
-        }
+    private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException {
+        tx.ready();
+        store.validate(tx);
+        store.commit(store.prepare(tx));
     }
 
     /**
@@ -89,21 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyCurrentLogRecoveryBatch() {
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
-
-        ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
-        DataTreeModification snapshot = writeTx.getSnapshot();
-        for (ModificationPayload payload : currentLogRecoveryBatch) {
-            try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
-            } catch (Exception e) {
-                log.error("{}: Error extracting ModificationPayload", shardName, e);
-            }
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
+        try {
+            commitTransaction(transaction);
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to apply recovery batch", shardName, e);
         }
-
-        commitTransaction(writeTx);
-
-        currentLogRecoveryBatch = null;
+        transaction = null;
     }
 
     /**
@@ -115,19 +108,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applying recovered snapshot", shardName);
 
-        // Intentionally bypass normal transaction to side-step persistence/replication
-        final DataTree tree = store.getDataTree();
-        DataTreeModification writeTx = tree.takeSnapshot().newModification();
-
-        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
-        writeTx.write(ROOT, node);
-        writeTx.ready();
+        final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        final DataTreeModification tx = store.takeSnapshot().newModification();
+        tx.write(ROOT, node);
         try {
-            tree.validate(writeTx);
-            tree.commit(tree.prepare(writeTx));
+            commitTransaction(tx);
         } catch (DataValidationFailedException e) {
-            log.error("{}: Failed to validate recovery snapshot", shardName, e);
+            log.error("{}: Failed to apply recovery snapshot", shardName, e);
         }
     }
 }