Address review comments
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardRecoveryCoordinator.java
index 7e547d7257dc495dc34a796d1c7b7a0944cb0e76..5b7f289eed4ea2fa37296a07b5e47e54af2201ac 100644 (file)
@@ -7,19 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.File;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 
@@ -30,74 +24,57 @@ import org.slf4j.Logger;
  * committed to the data store in the order the corresponding snapshot or log batch are received
  * to preserve data store integrity.
  *
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
  */
-class ShardRecoveryCoordinator {
-
-    private final InMemoryDOMDataStore store;
-    private List<ModificationPayload> currentLogRecoveryBatch;
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
+    private final ShardDataTree store;
     private final String shardName;
     private final Logger log;
+    private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
-        this.store = store;
-        this.shardName = shardName;
-        this.log = log;
-    }
+    private boolean open;
 
-    void startLogRecoveryBatch(int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+    ShardRecoveryCoordinator(final ShardDataTree store,  final byte[] restoreFromSnapshot, final String shardName, final Logger log) {
+        this.store = Preconditions.checkNotNull(store);
+        this.shardName = Preconditions.checkNotNull(shardName);
+        this.log = Preconditions.checkNotNull(log);
 
-        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        this.restoreFromSnapshot = restoreFromSnapshot;
     }
 
-    void appendRecoveredLogPayload(Payload payload) {
-        try {
-            if(payload instanceof ModificationPayload) {
-                currentLogRecoveryBatch.add((ModificationPayload) payload);
-            } else if (payload instanceof CompositeModificationPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationPayload) payload).getModification())));
-            } else if (payload instanceof CompositeModificationByteStringPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationByteStringPayload) payload).getModification())));
-            } else {
-                log.error("{}: Unknown payload {} received during recovery", shardName, payload);
-            }
-        } catch (IOException e) {
-            log.error("{}: Error extracting ModificationPayload", shardName, e);
-        }
-
+    @Override
+    public void startLogRecoveryBatch(final int maxBatchSize) {
+        log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        open = true;
     }
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+    @Override
+    public void appendRecoveredLogEntry(final Payload payload) {
+        Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
+
         try {
-            commitCohort.preCommit().get();
-            commitCohort.commit().get();
+            store.applyRecoveryPayload(payload);
         } catch (Exception e) {
-            log.error("{}: Failed to commit Tx on recovery", shardName, e);
+            log.error("{}: failed to apply payload {}", shardName, payload, e);
+            throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+                shardName, payload), e);
         }
     }
 
     /**
      * Applies the current batched log entries to the data store.
      */
-    void applyCurrentLogRecoveryBatch() {
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
-
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
-        for(ModificationPayload payload: currentLogRecoveryBatch) {
-            try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
-            } catch (Exception e) {
-                log.error("{}: Error extracting ModificationPayload", shardName, e);
-            }
-        }
-
-        commitTransaction(writeTx);
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
+        Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+        open = false;
+    }
 
-        currentLogRecoveryBatch = null;
+    private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+        final File file = new File(System.getProperty("karaf.data", "."),
+            "failed-recovery-" + kind + "-" + shardName + ".xml");
+        NormalizedNodeXMLOutput.toFile(file, node);
+        return file;
     }
 
     /**
@@ -105,15 +82,30 @@ class ShardRecoveryCoordinator {
      *
      * @param snapshotBytes the serialized snapshot
      */
-    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applyng recovered sbapshot", shardName);
-
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+    @Override
+    public void applyRecoverySnapshot(final byte[] snapshotBytes) {
+        log.debug("{}: Applying recovered snapshot", shardName);
 
-        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        final ShardDataTreeSnapshot snapshot;
+        try {
+            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
+        } catch (Exception e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
+            throw Throwables.propagate(e);
+        }
 
-        writeTx.write(YangInstanceIdentifier.builder().build(), node);
+        try {
+            store.applyRecoverySnapshot(snapshot);
+        } catch (Exception e) {
+            final File f = writeRoot("snapshot", snapshot.getRootNode().orElse(null));
+            throw new IllegalStateException(String.format(
+                    "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
+                    shardName, snapshot, f), e);
+        }
+    }
 
-        commitTransaction(writeTx);
+    @Override
+    public byte[] getRestoreFromSnapshot() {
+        return restoreFromSnapshot;
     }
 }