Snapshot and journal export on recovery 53/90453/81
authortadei.bilan <tadei.bilan@pantheon.tech>
Mon, 15 Jun 2020 13:31:00 +0000 (16:31 +0300)
committerRobert Varga <nite@hq.sk>
Tue, 29 Jun 2021 12:28:18 +0000 (12:28 +0000)
Added ability to export snapshot and journal content into json file
during recovery.

JIRA: CONTROLLER-1955
Change-Id: Ic2d6181ab56d7b413f06ed91cf5f9d37e3aa2029
Signed-off-by: tadei.bilan <tadei.bilan@pantheon.tech>
Signed-off-by: Oleksii Mozghovyi <oleksii.mozghovyi@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
15 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospector.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/JsonExportActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextIntrospectorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/JsonExportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedJournalExport.json [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedSnapshotExport.json [new file with mode: 0644]

index e65c8f7..6ce9fc2 100644 (file)
@@ -131,3 +131,24 @@ snapshot-on-root-overwrite=false
 
 # Enable lz4 compression for snapshots sent from leader to followers
 #use-lz4-compression=true
+
+# Export snapshot and journal content after recovery, possible modes: off, json
+#
+# Journal Json structure:
+#       Entries : [
+#            Entry : [
+#                Node: [
+#                   Path : {},
+#                   ModificationType : {},
+#                   Data : {}
+#                ]
+#            ]
+#        ]
+#
+# Snapshot Json structure:
+#       RootNode : {}
+#
+export-on-recovery=off
+
+# Directory name for export files
+#recovery-export-base-dir=persistence-export
\ No newline at end of file
index 144dde2..cab3b3b 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-codec-xml</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-codec-gson</artifactId>
+    </dependency>
     <dependency>
       <groupId>tech.pantheon.triemap</groupId>
       <artifactId>triemap</artifactId>
index 2987ea2..7682c64 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +67,8 @@ public class DatastoreContext implements ClientActorConfig {
             TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
     public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
     public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
+    public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
+    public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
 
     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
 
@@ -102,6 +105,8 @@ public class DatastoreContext implements ClientActorConfig {
     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
     private boolean useLz4Compression = false;
+    private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
+    private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
@@ -148,6 +153,8 @@ public class DatastoreContext implements ClientActorConfig {
         this.noProgressTimeout = other.noProgressTimeout;
         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
         this.useLz4Compression = other.useLz4Compression;
+        this.exportOnRecovery = other.exportOnRecovery;
+        this.recoveryExportBaseDir = other.recoveryExportBaseDir;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -366,6 +373,14 @@ public class DatastoreContext implements ClientActorConfig {
         return useLz4Compression;
     }
 
+    public ExportOnRecovery getExportOnRecovery() {
+        return exportOnRecovery;
+    }
+
+    public String getRecoveryExportBaseDir() {
+        return recoveryExportBaseDir;
+    }
+
     @Override
     public int getMaximumMessageSliceSize() {
         return maximumMessageSliceSize;
@@ -617,6 +632,16 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder exportOnRecovery(final ExportOnRecovery value) {
+            datastoreContext.exportOnRecovery = value;
+            return this;
+        }
+
+        public Builder recoveryExportBaseDir(final String value) {
+            datastoreContext.recoveryExportBaseDir = value;
+            return this;
+        }
+
         /**
          * For unit tests only.
          */
index c94f869..0ee005a 100644 (file)
@@ -20,8 +20,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import javax.management.ConstructorParameters;
@@ -337,8 +339,12 @@ public class DatastoreContextIntrospector {
 
             // Call the setter method on the Builder instance.
             final Method setter = BUILDER_SETTERS.get(key);
-            setter.invoke(builder, constructorValueRecursively(
-                    Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+            if (value.getClass().isEnum()) {
+                setter.invoke(builder, value);
+            } else {
+                setter.invoke(builder, constructorValueRecursively(
+                        Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+            }
 
             return true;
         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
@@ -373,6 +379,18 @@ public class DatastoreContextIntrospector {
         LOG.debug("Type for property {}: {}, converting value {} ({})",
                 name, propertyType.getSimpleName(), from, from.getClass().getSimpleName());
 
+        if (propertyType.isEnum()) {
+            try {
+                final Method enumConstructor = propertyType.getDeclaredMethod("forName", String.class);
+                final Object optional =  enumConstructor.invoke(null, from.toString().toLowerCase(Locale.ROOT));
+                if (optional instanceof Optional) {
+                    return ((Optional<Object>)optional).orElseThrow();
+                }
+            } catch (NoSuchMethodException e) {
+                LOG.error("Error constructing value ({}) for enum {}", from, propertyType);
+            }
+        }
+
         // Recurse the chain of constructors depth-first to get the resulting value. Eg, if the
         // property type is the yang-generated NonZeroUint32Type, it's constructor takes a Long so
         // we have to first construct a Long instance from the input value.
index 9c548e5..fb0ef03 100644 (file)
@@ -16,9 +16,12 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.ExtendedActorSystem;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +67,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -99,12 +103,14 @@ import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
@@ -206,6 +212,10 @@ public class Shard extends RaftActor {
 
     private final MessageAssembler requestMessageAssembler;
 
+    private final ExportOnRecovery exportOnRecovery;
+
+    private final ActorRef exportActor;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -215,6 +225,18 @@ public class Shard extends RaftActor {
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
+        this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+
+        switch (exportOnRecovery) {
+            case Json:
+                exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+                        datastoreContext.getRecoveryExportBaseDir()));
+                break;
+            case Off:
+            default:
+                exportActor = null;
+                break;
+        }
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -308,6 +330,25 @@ public class Shard extends RaftActor {
             getSender());
 
         super.handleRecover(message);
+
+        switch (exportOnRecovery) {
+            case Json:
+                if (message instanceof SnapshotOffer) {
+                    exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
+                            ActorRef.noSender());
+                } else if (message instanceof ReplicatedLogEntry) {
+                    exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+                            ActorRef.noSender());
+                } else if (message instanceof RecoveryCompleted) {
+                    exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
+                    exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                }
+                break;
+            case Off:
+            default:
+                break;
+        }
+
         if (LOG.isTraceEnabled()) {
             appendEntriesReplyTracker.begin();
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/JsonExportActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/JsonExportActor.java
new file mode 100644 (file)
index 0000000..369fc0b
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Props;
+import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public final class JsonExportActor extends AbstractUntypedActor {
+    // Internal messages
+    public static final class ExportSnapshot {
+        private final String id;
+
+        private final DataTreeCandidate dataTreeCandidate;
+
+        public ExportSnapshot(final DataTreeCandidate candidate, final String id) {
+            this.dataTreeCandidate = requireNonNull(candidate);
+            this.id = requireNonNull(id);
+        }
+    }
+
+    public static final class ExportJournal {
+        private final ReplicatedLogEntry replicatedLogEntry;
+
+        public ExportJournal(final ReplicatedLogEntry replicatedLogEntry) {
+            this.replicatedLogEntry = requireNonNull(replicatedLogEntry);
+        }
+    }
+
+    public static final class FinishExport {
+        private final String id;
+
+        public FinishExport(final String id) {
+            this.id = requireNonNull(id);
+        }
+    }
+
+    private final List<ReplicatedLogEntry> entries = new ArrayList<>();
+    private final EffectiveModelContext schemaContext;
+    private final Path baseDirPath;
+
+    private JsonExportActor(final EffectiveModelContext schemaContext, final Path dirPath) {
+        this.schemaContext = requireNonNull(schemaContext);
+        this.baseDirPath = requireNonNull(dirPath);
+    }
+
+    public static Props props(final EffectiveModelContext schemaContext, final String dirPath) {
+        return Props.create(JsonExportActor.class, schemaContext, Paths.get(dirPath));
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof ExportSnapshot) {
+            onExportSnapshot((ExportSnapshot) message);
+        } else if (message instanceof ExportJournal) {
+            onExportJournal((ExportJournal) message);
+        } else if (message instanceof FinishExport) {
+            onFinishExport((FinishExport)message);
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    private void onExportSnapshot(final ExportSnapshot exportSnapshot) {
+        final Path snapshotDir = baseDirPath.resolve("snapshots");
+        createDir(snapshotDir);
+
+        final Path filePath = snapshotDir.resolve(exportSnapshot.id + "-snapshot.json");
+        LOG.debug("Creating JSON file : {}", filePath);
+
+        final NormalizedNode root = exportSnapshot.dataTreeCandidate.getRootNode().getDataAfter().get();
+        checkState(root instanceof NormalizedNodeContainer, "Unexpected root %s", root);
+
+        writeSnapshot(filePath, (NormalizedNodeContainer<?>) root);
+        LOG.debug("Created JSON file: {}", filePath);
+    }
+
+    private void onExportJournal(final ExportJournal exportJournal) {
+        entries.add(exportJournal.replicatedLogEntry);
+    }
+
+    private void onFinishExport(final FinishExport finishExport) {
+        final Path journalDir = baseDirPath.resolve("journals");
+        createDir(journalDir);
+
+        final Path filePath = journalDir.resolve(finishExport.id + "-journal.json");
+        LOG.debug("Creating JSON file : {}", filePath);
+        writeJournal(filePath);
+        LOG.debug("Created JSON file: {}", filePath);
+    }
+
+    private void writeSnapshot(final Path path, final NormalizedNodeContainer<?> root) {
+        try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
+            jsonWriter.beginObject();
+
+            try (NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(
+                JSONNormalizedNodeStreamWriter.createNestedWriter(
+                    JSONCodecFactorySupplier.RFC7951.getShared(schemaContext), SchemaPath.ROOT, null, jsonWriter),
+                true)) {
+                for (NormalizedNode node : root.body()) {
+                    nnWriter.write(node);
+                }
+            }
+
+            jsonWriter.endObject();
+        } catch (IOException e) {
+            LOG.error("Failed to export stapshot to {}", path, e);
+        }
+    }
+
+    private void writeJournal(final Path path) {
+        try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
+            jsonWriter.beginObject().name("Entries");
+            jsonWriter.beginArray();
+            for (ReplicatedLogEntry entry : entries) {
+                final Payload data = entry.getData();
+                if (data instanceof CommitTransactionPayload) {
+                    final CommitTransactionPayload payload = (CommitTransactionPayload) entry.getData();
+                    final DataTreeCandidate candidate = payload.getCandidate().getValue().getCandidate();
+                    writeNode(jsonWriter, candidate);
+                } else {
+                    jsonWriter.beginObject().name("Payload").value(data.toString()).endObject();
+                }
+            }
+            jsonWriter.endArray();
+            jsonWriter.endObject();
+        } catch (IOException e) {
+            LOG.error("Failed to export journal to {}", path, e);
+        }
+    }
+
+    private static void writeNode(final JsonWriter writer, final DataTreeCandidate candidate) throws IOException {
+        writer.beginObject();
+        writer.name("Entry");
+        writer.beginArray();
+        doWriteNode(writer, candidate.getRootPath(), candidate.getRootNode());
+        writer.endArray();
+        writer.endObject();
+    }
+
+    private static void doWriteNode(final JsonWriter writer, final YangInstanceIdentifier path,
+            final DataTreeCandidateNode node) throws IOException {
+        switch (node.getModificationType()) {
+            case APPEARED:
+            case DISAPPEARED:
+            case SUBTREE_MODIFIED:
+                NodeIterator iterator = new NodeIterator(null, path, node.getChildNodes().iterator());
+                do {
+                    iterator = iterator.next(writer);
+                } while (iterator != null);
+                break;
+            case DELETE:
+            case UNMODIFIED:
+            case WRITE:
+                outputNodeInfo(writer, path, node);
+                break;
+            default:
+                outputDefault(writer, path, node);
+        }
+    }
+
+    private static void outputNodeInfo(final JsonWriter writer, final YangInstanceIdentifier path,
+                                       final DataTreeCandidateNode node) throws IOException {
+        final ModificationType modificationType = node.getModificationType();
+
+        writer.beginObject().name("Node");
+        writer.beginArray();
+        writer.beginObject().name("Path").value(path.toString()).endObject();
+        writer.beginObject().name("ModificationType").value(modificationType.toString()).endObject();
+        if (modificationType == ModificationType.WRITE) {
+            writer.beginObject().name("Data").value(node.getDataAfter().get().body().toString()).endObject();
+        }
+        writer.endArray();
+        writer.endObject();
+    }
+
+    private static void outputDefault(final JsonWriter writer, final YangInstanceIdentifier path,
+                                      final DataTreeCandidateNode node) throws IOException {
+        writer.beginObject().name("Node");
+        writer.beginArray();
+        writer.beginObject().name("Path").value(path.toString()).endObject();
+        writer.beginObject().name("ModificationType")
+                .value("UNSUPPORTED MODIFICATION: " + node.getModificationType()).endObject();
+        writer.endArray();
+        writer.endObject();
+    }
+
+    private void createDir(final Path path) {
+        try {
+            Files.createDirectories(path);
+        } catch (IOException e) {
+            LOG.warn("Directory {} cannot be created", path, e);
+        }
+    }
+
+    private static final class NodeIterator {
+        private final Iterator<DataTreeCandidateNode> iterator;
+        private final YangInstanceIdentifier path;
+        private final NodeIterator parent;
+
+        NodeIterator(final @Nullable NodeIterator parent, final YangInstanceIdentifier path,
+                     final Iterator<DataTreeCandidateNode> iterator) {
+            this.iterator = requireNonNull(iterator);
+            this.path = requireNonNull(path);
+            this.parent = parent;
+        }
+
+        NodeIterator next(final JsonWriter writer) throws IOException {
+            while (iterator.hasNext()) {
+                final DataTreeCandidateNode node = iterator.next();
+                final YangInstanceIdentifier child = path.node(node.getIdentifier());
+
+                switch (node.getModificationType()) {
+                    case APPEARED:
+                    case DISAPPEARED:
+                    case SUBTREE_MODIFIED:
+                        return new NodeIterator(this, child, node.getChildNodes().iterator());
+                    case DELETE:
+                    case UNMODIFIED:
+                    case WRITE:
+                        outputNodeInfo(writer, path, node);
+                        break;
+                    default:
+                        outputDefault(writer, child, node);
+                }
+            }
+
+            return parent;
+        }
+    }
+}
index da0c748..a51a18c 100644 (file)
@@ -303,6 +303,23 @@ module distributed-datastore-provider {
             description "Use lz4 compression for snapshots, sent from leader to follower, for snapshots stored
                         by LocalSnapshotStore, use akka.conf configuration.";
         }
+
+        leaf export-on-recovery {
+            default off;
+            type enumeration {
+                enum off;
+                enum json;
+            }
+            description "Export snapshot and journal during recovery. Possible modes: off(default),
+                        json(export to json files). Note that in case of large snapshot,
+                        export will take a lot of time.";
+        }
+
+        leaf recovery-export-base-dir {
+            default persistence-export;
+            type string;
+            description "Directory name for snapshot and journal dumps.";
+        }
     }
 
     container data-store-properties-container {
index 8c6e841..31f2b6b 100644 (file)
@@ -93,21 +93,20 @@ import scala.concurrent.duration.FiniteDuration;
 public abstract class AbstractShardTest extends AbstractActorTest {
     protected static final EffectiveModelContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
-    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
-
+    protected static final AtomicInteger SHARD_NUM = new AtomicInteger();
     protected static final int HEARTBEAT_MILLIS = 100;
 
-    protected final ShardIdentifier shardID = ShardIdentifier.create("inventory", MemberName.forName("member-1"),
-        "config" + NEXT_SHARD_NUM.getAndIncrement());
-
     protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
             .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000)
             .shardHeartbeatIntervalInMillis(HEARTBEAT_MILLIS);
 
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    protected final int nextShardNum = SHARD_NUM.getAndIncrement();
+    protected final ShardIdentifier shardID = ShardIdentifier.create("inventory", MemberName.forName("member-1"),
+        "config" + nextShardNum);
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         InMemorySnapshotStore.clear();
         InMemoryJournal.clear();
     }
@@ -132,7 +131,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
             .schemaContextProvider(() -> SCHEMA_CONTEXT);
     }
 
-    protected void testRecovery(final Set<Integer> listEntryKeys) throws Exception {
+    protected void testRecovery(final Set<Integer> listEntryKeys, final boolean stopActorOnFinish) throws Exception {
         // Create the actor and wait for recovery complete.
 
         final int nListEntries = listEntryKeys.size();
@@ -184,7 +183,9 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         assertEquals("Last applied", nListEntries,
                 shard.underlyingActor().getShardMBean().getLastApplied());
 
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        if (stopActorOnFinish) {
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
     }
 
     protected void verifyLastApplied(final TestActorRef<Shard> shard, final long expectedValue) {
index bbe676b..d0a2882 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
 
 /**
@@ -76,6 +77,8 @@ public class DatastoreContextIntrospectorTest {
         properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
         properties.put("persistent", "false");
         properties.put("initial-payload-serialized-buffer-capacity", "600");
+        properties.put("export-on-recovery", "json");
+        properties.put("recovery-json-dump", "persistence-export");
 
         boolean updated = introspector.update(properties);
         assertTrue("updated", updated);
@@ -99,6 +102,8 @@ public class DatastoreContextIntrospectorTest {
         assertEquals(901, context.getShardBatchedModificationCount());
         assertEquals(200, context.getTransactionCreationInitialRateLimit());
         assertEquals(600, context.getInitialPayloadSerializedBufferCapacity());
+        assertEquals("persistence-export", context.getRecoveryExportBaseDir());
+        assertEquals(ExportOnRecovery.Json, context.getExportOnRecovery());
         assertFalse(context.isPersistent());
 
         properties.put("shard-transaction-idle-timeout-in-minutes", "32");
index 4d47f7f..0917a52 100644 (file)
@@ -17,6 +17,7 @@ import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEF
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_EXPORT_BASE_DIR;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
 import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
 
 public class DatastoreContextTest {
 
@@ -69,6 +71,7 @@ public class DatastoreContextTest {
         assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT,
                 context.getShardBatchedModificationCount());
         assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE, context.getMaximumMessageSliceSize());
+        assertEquals(DEFAULT_RECOVERY_EXPORT_BASE_DIR, context.getRecoveryExportBaseDir());
     }
 
     @Test
@@ -108,6 +111,8 @@ public class DatastoreContextTest {
                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
         builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
         builder.initialPayloadSerializedBufferCapacity(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1);
+        builder.exportOnRecovery(ExportOnRecovery.Json);
+        builder.recoveryExportBaseDir(DEFAULT_RECOVERY_EXPORT_BASE_DIR + "-new");
 
         DatastoreContext context = builder.build();
 
@@ -159,5 +164,8 @@ public class DatastoreContextTest {
         assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
         assertEquals(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1,
                 context.getInitialPayloadSerializedBufferCapacity());
+        assertEquals(DEFAULT_RECOVERY_EXPORT_BASE_DIR + "-new",
+                context.getRecoveryExportBaseDir());
+        assertEquals(ExportOnRecovery.Json, context.getExportOnRecovery());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/JsonExportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/JsonExportTest.java
new file mode 100644 (file)
index 0000000..fc5665b
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class JsonExportTest extends AbstractShardTest {
+    private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
+            + "InMemorySnapshotStore and journal recovery seq number will start from 1";
+    private static final String EXPECTED_JOURNAL_FILE = "expectedJournalExport.json";
+    private static final String EXPECTED_SNAPSHOT_FILE = "expectedSnapshotExport.json";
+    private static String actualJournalFilePath;
+    private static String actualSnapshotFilePath;
+    private DatastoreContext datastoreContext;
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        final File exportTmpFolder = temporaryFolder.newFolder("persistence-export");
+        actualJournalFilePath = exportTmpFolder.getAbsolutePath() + "/journals/"
+            + "member-1-shard-inventory-config" + nextShardNum + "-journal.json";
+        actualSnapshotFilePath = exportTmpFolder.getAbsolutePath() + "/snapshots/"
+            + "member-1-shard-inventory-config" + nextShardNum + "-snapshot.json";
+        datastoreContext = DatastoreContext.newBuilder().shardJournalRecoveryLogBatchSize(1)
+            .shardSnapshotBatchCount(5000).shardHeartbeatIntervalInMillis(HEARTBEAT_MILLIS).persistent(true)
+            .exportOnRecovery(ExportOnRecovery.Json)
+            .recoveryExportBaseDir(exportTmpFolder.getAbsolutePath()).build();
+    }
+
+    @Override
+    protected DatastoreContext newDatastoreContext() {
+        return datastoreContext;
+    }
+
+    @Test
+    public void testJsonExport() throws Exception {
+        // Set up the InMemorySnapshotStore.
+        final DataTree source = setupInMemorySnapshotStore();
+
+        final DataTreeModification writeMod = source.takeSnapshot().newModification();
+        writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+        writeMod.ready();
+        InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
+        // Set up the InMemoryJournal.
+        InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
+                payloadForModification(source, writeMod, nextTransactionId())));
+
+        final int nListEntries = 16;
+        final Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
+        for (int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(i);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+            final DataTreeModification mod = source.takeSnapshot().newModification();
+            mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            mod.ready();
+
+            InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
+                    payloadForModification(source, mod, nextTransactionId())));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
+                new ApplyJournalEntries(nListEntries));
+
+        testRecovery(listEntryKeys, false);
+
+        verifyJournalExport();
+        verifySnapshotExport();
+    }
+
+    private static void verifyJournalExport() throws IOException {
+        final String expectedJournalData = readExpectedFile(EXPECTED_JOURNAL_FILE);
+        final String actualJournalData = readActualFile(actualJournalFilePath);
+        assertEquals("Exported journal is not expected ", expectedJournalData, actualJournalData);
+    }
+
+    private static void verifySnapshotExport() throws IOException {
+        final String expectedSnapshotData = readExpectedFile(EXPECTED_SNAPSHOT_FILE);
+        final String actualSnapshotData = readActualFile(actualSnapshotFilePath);
+        assertEquals("Exported snapshot is not expected ", expectedSnapshotData, actualSnapshotData);
+    }
+
+    private static String readExpectedFile(final String filePath) throws IOException {
+        final File exportFile = new File(JsonExportTest.class.getClassLoader().getResource(filePath).getFile());
+        return new String(Files.readAllBytes(Path.of(exportFile.getPath())));
+    }
+
+    private static String readActualFile(final String filePath) throws IOException {
+        final File exportFile = new File(filePath);
+        await().atMost(10, TimeUnit.SECONDS).until(exportFile::exists);
+        return new String(Files.readAllBytes(Path.of(filePath)));
+    }
+}
index f396eae..34b984f 100644 (file)
@@ -388,7 +388,7 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
             new ApplyJournalEntries(nListEntries));
 
-        testRecovery(listEntryKeys);
+        testRecovery(listEntryKeys, true);
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedJournalExport.json b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedJournalExport.json
new file mode 100644 (file)
index 0000000..a7b22c6
--- /dev/null
@@ -0,0 +1 @@
+{"Entries":[{"Entry":[{"Node":[{"Path":"/"},{"ModificationType":"UNMODIFIED"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=1}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=2}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=3}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=4}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=5}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=6}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=7}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=8}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=9}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=10}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=11}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=12}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=13}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=14}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=15}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=16}]"}]}]}]}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedSnapshotExport.json b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/expectedSnapshotExport.json
new file mode 100644 (file)
index 0000000..e8be65b
--- /dev/null
@@ -0,0 +1 @@
+{"odl-datastore-test:test":{}}
\ No newline at end of file