Address review comments
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index c53375919396a7628386d8ae81f472f2d3e90fdc..5b7f289eed4ea2fa37296a07b5e47e54af2201ac 100644 (file)
@@ -10,22 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.io.File;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
-import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 
 /**
@@ -41,71 +30,51 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private final ShardDataTree store;
     private final String shardName;
     private final Logger log;
-    private PruningDataTreeModification transaction;
-    private int size;
     private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(ShardDataTree store,  byte[] restoreFromSnapshot, String shardName, Logger log) {
+    private boolean open;
+
+    ShardRecoveryCoordinator(final ShardDataTree store,  final byte[] restoreFromSnapshot, final String shardName, final Logger log) {
         this.store = Preconditions.checkNotNull(store);
-        this.restoreFromSnapshot = restoreFromSnapshot;
         this.shardName = Preconditions.checkNotNull(shardName);
         this.log = Preconditions.checkNotNull(log);
+
+        this.restoreFromSnapshot = restoreFromSnapshot;
     }
 
     @Override
-    public void startLogRecoveryBatch(int maxBatchSize) {
+    public void startLogRecoveryBatch(final int maxBatchSize) {
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
-        transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(),
-            store.getSchemaContext());
-        size = 0;
+        open = true;
     }
 
     @Override
-    public void appendRecoveredLogEntry(Payload payload) {
-        Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+    public void appendRecoveredLogEntry(final Payload payload) {
+        Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
 
         try {
-            if (payload instanceof DataTreeCandidateSupplier) {
-                final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
-                        ((DataTreeCandidateSupplier)payload).getCandidate();
-
-                DataTreeCandidates.applyToModification(transaction, e.getValue());
-                size++;
-
-                if (e.getKey().isPresent()) {
-                    // FIXME: BUG-5280: propagate transaction state
-                }
-            } else {
-                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
-            }
-        } catch (IOException e) {
-            log.error("{}: Error extracting payload", shardName, e);
+            store.applyRecoveryPayload(payload);
+        } catch (Exception e) {
+            log.error("{}: failed to apply payload {}", shardName, payload, e);
+            throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+                shardName, payload), e);
         }
     }
 
-    private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
-        store.commit(tx.getResultingModification());
-    }
-
     /**
      * Applies the current batched log entries to the data store.
      */
     @Override
     public void applyCurrentLogRecoveryBatch() {
-        Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        open = false;
+    }
 
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
-        try {
-            commitTransaction(transaction);
-        } catch (Exception e) {
-            File file = new File(System.getProperty("karaf.data", "."),
-                    "failed-recovery-batch-" + shardName + ".out");
-            DataTreeModificationOutput.toFile(file, transaction.getResultingModification());
-            throw new RuntimeException(String.format(
-                    "%s: Failed to apply recovery batch. Modification data was written to file %s",
-                    shardName, file), e);
-        }
-        transaction = null;
+    private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+        final File file = new File(System.getProperty("karaf.data", "."),
+            "failed-recovery-" + kind + "-" + shardName + ".xml");
+        NormalizedNodeXMLOutput.toFile(file, node);
+        return file;
     }
 
     /**
@@ -120,26 +89,18 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
         final ShardDataTreeSnapshot snapshot;
         try {
             snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (IOException e) {
-            log.error("{}: failed to deserialize snapshot", e);
+        } catch (Exception e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
             throw Throwables.propagate(e);
         }
 
-        final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
-                store.getDataTree(), store.getSchemaContext());
-
-        final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
-        tx.write(YangInstanceIdentifier.EMPTY, node);
-
         try {
-            commitTransaction(tx);
+            store.applyRecoverySnapshot(snapshot);
         } catch (Exception e) {
-            File file = new File(System.getProperty("karaf.data", "."),
-                    "failed-recovery-snapshot-" + shardName + ".xml");
-            NormalizedNodeXMLOutput.toFile(file, node);
-            throw new RuntimeException(String.format(
-                    "%s: Failed to apply recovery snapshot. Node data was written to file %s",
-                    shardName, file), e);
+            final File f = writeRoot("snapshot", snapshot.getRootNode().orElse(null));
+            throw new IllegalStateException(String.format(
+                    "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
+                    shardName, snapshot, f), e);
         }
     }