CDS: use internal DataTree instance
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index 01a124b6977c801e3f273c57341efe91d97c52b2..f9d305001567ad4b47706169de873ffafb36fb47 100644 (file)
@@ -17,11 +17,12 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 
 /**
@@ -31,16 +32,16 @@ import org.slf4j.Logger;
  * committed to the data store in the order the corresponding snapshot or log batch are received
  * to preserve data store integrity.
  *
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-
-    private final InMemoryDOMDataStore store;
+    private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+    private final ShardDataTree store;
     private List<ModificationPayload> currentLogRecoveryBatch;
     private final String shardName;
     private final Logger log;
 
-    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+    ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
         this.store = store;
         this.shardName = shardName;
         this.log = log;
@@ -73,8 +74,8 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     }
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+    private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
         try {
             commitCohort.preCommit().get();
             commitCohort.commit().get();
@@ -90,10 +91,11 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyCurrentLogRecoveryBatch() {
         log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
 
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
-        for(ModificationPayload payload: currentLogRecoveryBatch) {
+        ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
+        DataTreeModification snapshot = writeTx.getSnapshot();
+        for (ModificationPayload payload : currentLogRecoveryBatch) {
             try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+                MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
             } catch (Exception e) {
                 log.error("{}: Error extracting ModificationPayload", shardName, e);
             }
@@ -111,14 +113,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applyng recovered sbapshot", shardName);
+        log.debug("{}: Applying recovered snapshot", shardName);
 
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+        // Intentionally bypass normal transaction to side-step persistence/replication
+        final DataTree tree = store.getDataTree();
+        DataTreeModification writeTx = tree.takeSnapshot().newModification();
 
         NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
-        writeTx.write(YangInstanceIdentifier.builder().build(), node);
-
-        commitTransaction(writeTx);
+        writeTx.write(ROOT, node);
+        writeTx.ready();
+        try {
+            tree.validate(writeTx);
+            tree.commit(tree.prepare(writeTx));
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to validate recovery snapshot", shardName, e);
+        }
     }
 }