BUG-5280: expand ShardDataTree to cover transaction mechanics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index 5c9f0d11c6584378e4c3a9ff4d46fabe597a80ef..c53375919396a7628386d8ae81f472f2d3e90fdc 100644 (file)
@@ -8,25 +8,24 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
+import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
 /**
@@ -39,25 +38,25 @@ import org.slf4j.Logger;
  * @author Thomas Pantelis
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-    private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
-    private final DataTree store;
+    private final ShardDataTree store;
     private final String shardName;
     private final Logger log;
-    private final Set<URI> validNamespaces;
     private PruningDataTreeModification transaction;
     private int size;
+    private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
-        this.store = store.getDataTree();
-        this.shardName = shardName;
-        this.log = log;
-        this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
+    ShardRecoveryCoordinator(ShardDataTree store,  byte[] restoreFromSnapshot, String shardName, Logger log) {
+        this.store = Preconditions.checkNotNull(store);
+        this.restoreFromSnapshot = restoreFromSnapshot;
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.log = Preconditions.checkNotNull(log);
     }
 
     @Override
     public void startLogRecoveryBatch(int maxBatchSize) {
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
-        transaction = new PruningDataTreeModification(store.takeSnapshot().newModification(), validNamespaces);
+        transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(),
+            store.getSchemaContext());
         size = 0;
     }
 
@@ -66,34 +65,26 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
         Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
 
         try {
-            if (payload instanceof DataTreeCandidatePayload) {
-                DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
-                size++;
-            } else if (payload instanceof ModificationPayload) {
-                MutableCompositeModification.fromSerializable(
-                    ((ModificationPayload) payload).getModification()).apply(transaction);
-                size++;
-            } else if (payload instanceof CompositeModificationPayload) {
-                MutableCompositeModification.fromSerializable(
-                    ((CompositeModificationPayload) payload).getModification()).apply(transaction);
-                size++;
-            } else if (payload instanceof CompositeModificationByteStringPayload) {
-                MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+            if (payload instanceof DataTreeCandidateSupplier) {
+                final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
+                        ((DataTreeCandidateSupplier)payload).getCandidate();
+
+                DataTreeCandidates.applyToModification(transaction, e.getValue());
                 size++;
+
+                if (e.getKey().isPresent()) {
+                    // FIXME: BUG-5280: propagate transaction state
+                }
             } else {
                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (IOException | ClassNotFoundException e) {
-            log.error("{}: Error extracting ModificationPayload", shardName, e);
+        } catch (IOException e) {
+            log.error("{}: Error extracting payload", shardName, e);
         }
     }
 
     private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
-        DataTreeModification delegate = tx.getDelegate();
-        delegate.ready();
-        store.validate(delegate);
-        store.commit(store.prepare(delegate));
+        store.commit(tx.getResultingModification());
     }
 
     /**
@@ -106,8 +97,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
         log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
         try {
             commitTransaction(transaction);
-        } catch (DataValidationFailedException e) {
-            log.error("{}: Failed to apply recovery batch", shardName, e);
+        } catch (Exception e) {
+            File file = new File(System.getProperty("karaf.data", "."),
+                    "failed-recovery-batch-" + shardName + ".out");
+            DataTreeModificationOutput.toFile(file, transaction.getResultingModification());
+            throw new RuntimeException(String.format(
+                    "%s: Failed to apply recovery batch. Modification data was written to file %s",
+                    shardName, file), e);
         }
         transaction = null;
     }
@@ -121,13 +117,34 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applying recovered snapshot", shardName);
 
-        final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-        final PruningDataTreeModification tx = new PruningDataTreeModification(store.takeSnapshot().newModification(), validNamespaces);
-        tx.write(ROOT, node);
+        final ShardDataTreeSnapshot snapshot;
+        try {
+            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
+        } catch (IOException e) {
+            log.error("{}: failed to deserialize snapshot", e);
+            throw Throwables.propagate(e);
+        }
+
+        final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
+                store.getDataTree(), store.getSchemaContext());
+
+        final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
+        tx.write(YangInstanceIdentifier.EMPTY, node);
+
         try {
             commitTransaction(tx);
-        } catch (DataValidationFailedException e) {
-            log.error("{}: Failed to apply recovery snapshot", shardName, e);
+        } catch (Exception e) {
+            File file = new File(System.getProperty("karaf.data", "."),
+                    "failed-recovery-snapshot-" + shardName + ".xml");
+            NormalizedNodeXMLOutput.toFile(file, node);
+            throw new RuntimeException(String.format(
+                    "%s: Failed to apply recovery snapshot. Node data was written to file %s",
+                    shardName, file), e);
         }
     }
+
+    @Override
+    public byte[] getRestoreFromSnapshot() {
+        return restoreFromSnapshot;
+    }
 }