# Enable lz4 compression for snapshots sent from leader to followers
#use-lz4-compression=true
+
+# Export snapshot and journal content after recovery, possible modes: off, json
+#
+# Journal Json structure:
+# Entries : [
+# Entry : [
+# Node: [
+# Path : {},
+# ModificationType : {},
+# Data : {}
+# ]
+# ]
+# ]
+#
+# Snapshot Json structure:
+# RootNode : {}
+#
+export-on-recovery=off
+
+# Directory name for export files
+#recovery-export-base-dir=persistence-export
\ No newline at end of file
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-codec-xml</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-codec-gson</artifactId>
+ </dependency>
<dependency>
<groupId>tech.pantheon.triemap</groupId>
<artifactId>triemap</artifactId>
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_MAX_MESSAGE_SLICE_SIZE = 2048 * 1000; // 2MB
public static final int DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY = 512;
+ public static final ExportOnRecovery DEFAULT_EXPORT_ON_RECOVERY = ExportOnRecovery.Off;
+ public static final String DEFAULT_RECOVERY_EXPORT_BASE_DIR = "persistence-export";
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
private boolean useLz4Compression = false;
+ private ExportOnRecovery exportOnRecovery = DEFAULT_EXPORT_ON_RECOVERY;
+ private String recoveryExportBaseDir = DEFAULT_RECOVERY_EXPORT_BASE_DIR;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
this.noProgressTimeout = other.noProgressTimeout;
this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
this.useLz4Compression = other.useLz4Compression;
+ this.exportOnRecovery = other.exportOnRecovery;
+ this.recoveryExportBaseDir = other.recoveryExportBaseDir;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return useLz4Compression;
}
+ public ExportOnRecovery getExportOnRecovery() {
+ return exportOnRecovery;
+ }
+
+ public String getRecoveryExportBaseDir() {
+ return recoveryExportBaseDir;
+ }
+
@Override
public int getMaximumMessageSliceSize() {
return maximumMessageSliceSize;
return this;
}
+ public Builder exportOnRecovery(final ExportOnRecovery value) {
+ datastoreContext.exportOnRecovery = value;
+ return this;
+ }
+
+ public Builder recoveryExportBaseDir(final String value) {
+ datastoreContext.recoveryExportBaseDir = value;
+ return this;
+ }
+
/**
* For unit tests only.
*/
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import javax.management.ConstructorParameters;
// Call the setter method on the Builder instance.
final Method setter = BUILDER_SETTERS.get(key);
- setter.invoke(builder, constructorValueRecursively(
- Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+ if (value.getClass().isEnum()) {
+ setter.invoke(builder, value);
+ } else {
+ setter.invoke(builder, constructorValueRecursively(
+ Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
+ }
return true;
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
LOG.debug("Type for property {}: {}, converting value {} ({})",
name, propertyType.getSimpleName(), from, from.getClass().getSimpleName());
+ if (propertyType.isEnum()) {
+ try {
+ final Method enumConstructor = propertyType.getDeclaredMethod("forName", String.class);
+ final Object optional = enumConstructor.invoke(null, from.toString().toLowerCase(Locale.ROOT));
+ if (optional instanceof Optional) {
+ return ((Optional<Object>)optional).orElseThrow();
+ }
+ } catch (NoSuchMethodException e) {
+ LOG.error("Error constructing value ({}) for enum {}", from, propertyType);
+ }
+ }
+
// Recurse the chain of constructors depth-first to get the resulting value. Eg, if the
// property type is the yang-generated NonZeroUint32Type, it's constructor takes a Long so
// we have to first construct a Long instance from the input value.
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.ExtendedActorSystem;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
private final MessageAssembler requestMessageAssembler;
+ private final ExportOnRecovery exportOnRecovery;
+
+ private final ActorRef exportActor;
+
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
+ this.exportOnRecovery = datastoreContext.getExportOnRecovery();
+
+ switch (exportOnRecovery) {
+ case Json:
+ exportActor = getContext().actorOf(JsonExportActor.props(builder.getSchemaContext(),
+ datastoreContext.getRecoveryExportBaseDir()));
+ break;
+ case Off:
+ default:
+ exportActor = null;
+ break;
+ }
setPersistence(datastoreContext.isPersistent());
getSender());
super.handleRecover(message);
+
+ switch (exportOnRecovery) {
+ case Json:
+ if (message instanceof SnapshotOffer) {
+ exportActor.tell(new JsonExportActor.ExportSnapshot(store.readCurrentData().get(), name),
+ ActorRef.noSender());
+ } else if (message instanceof ReplicatedLogEntry) {
+ exportActor.tell(new JsonExportActor.ExportJournal((ReplicatedLogEntry) message),
+ ActorRef.noSender());
+ } else if (message instanceof RecoveryCompleted) {
+ exportActor.tell(new JsonExportActor.FinishExport(name), ActorRef.noSender());
+ exportActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ break;
+ case Off:
+ default:
+ break;
+ }
+
if (LOG.isTraceEnabled()) {
appendEntriesReplyTracker.begin();
}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.Props;
+import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public final class JsonExportActor extends AbstractUntypedActor {
+ // Internal messages
+ public static final class ExportSnapshot {
+ private final String id;
+
+ private final DataTreeCandidate dataTreeCandidate;
+
+ public ExportSnapshot(final DataTreeCandidate candidate, final String id) {
+ this.dataTreeCandidate = requireNonNull(candidate);
+ this.id = requireNonNull(id);
+ }
+ }
+
+ public static final class ExportJournal {
+ private final ReplicatedLogEntry replicatedLogEntry;
+
+ public ExportJournal(final ReplicatedLogEntry replicatedLogEntry) {
+ this.replicatedLogEntry = requireNonNull(replicatedLogEntry);
+ }
+ }
+
+ public static final class FinishExport {
+ private final String id;
+
+ public FinishExport(final String id) {
+ this.id = requireNonNull(id);
+ }
+ }
+
+ private final List<ReplicatedLogEntry> entries = new ArrayList<>();
+ private final EffectiveModelContext schemaContext;
+ private final Path baseDirPath;
+
+ private JsonExportActor(final EffectiveModelContext schemaContext, final Path dirPath) {
+ this.schemaContext = requireNonNull(schemaContext);
+ this.baseDirPath = requireNonNull(dirPath);
+ }
+
+ public static Props props(final EffectiveModelContext schemaContext, final String dirPath) {
+ return Props.create(JsonExportActor.class, schemaContext, Paths.get(dirPath));
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof ExportSnapshot) {
+ onExportSnapshot((ExportSnapshot) message);
+ } else if (message instanceof ExportJournal) {
+ onExportJournal((ExportJournal) message);
+ } else if (message instanceof FinishExport) {
+ onFinishExport((FinishExport)message);
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void onExportSnapshot(final ExportSnapshot exportSnapshot) {
+ final Path snapshotDir = baseDirPath.resolve("snapshots");
+ createDir(snapshotDir);
+
+ final Path filePath = snapshotDir.resolve(exportSnapshot.id + "-snapshot.json");
+ LOG.debug("Creating JSON file : {}", filePath);
+
+ final NormalizedNode root = exportSnapshot.dataTreeCandidate.getRootNode().getDataAfter().get();
+ checkState(root instanceof NormalizedNodeContainer, "Unexpected root %s", root);
+
+ writeSnapshot(filePath, (NormalizedNodeContainer<?>) root);
+ LOG.debug("Created JSON file: {}", filePath);
+ }
+
+ private void onExportJournal(final ExportJournal exportJournal) {
+ entries.add(exportJournal.replicatedLogEntry);
+ }
+
+ private void onFinishExport(final FinishExport finishExport) {
+ final Path journalDir = baseDirPath.resolve("journals");
+ createDir(journalDir);
+
+ final Path filePath = journalDir.resolve(finishExport.id + "-journal.json");
+ LOG.debug("Creating JSON file : {}", filePath);
+ writeJournal(filePath);
+ LOG.debug("Created JSON file: {}", filePath);
+ }
+
+ private void writeSnapshot(final Path path, final NormalizedNodeContainer<?> root) {
+ try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
+ jsonWriter.beginObject();
+
+ try (NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(
+ JSONNormalizedNodeStreamWriter.createNestedWriter(
+ JSONCodecFactorySupplier.RFC7951.getShared(schemaContext), SchemaPath.ROOT, null, jsonWriter),
+ true)) {
+ for (NormalizedNode node : root.body()) {
+ nnWriter.write(node);
+ }
+ }
+
+ jsonWriter.endObject();
+ } catch (IOException e) {
+ LOG.error("Failed to export stapshot to {}", path, e);
+ }
+ }
+
+ private void writeJournal(final Path path) {
+ try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
+ jsonWriter.beginObject().name("Entries");
+ jsonWriter.beginArray();
+ for (ReplicatedLogEntry entry : entries) {
+ final Payload data = entry.getData();
+ if (data instanceof CommitTransactionPayload) {
+ final CommitTransactionPayload payload = (CommitTransactionPayload) entry.getData();
+ final DataTreeCandidate candidate = payload.getCandidate().getValue().getCandidate();
+ writeNode(jsonWriter, candidate);
+ } else {
+ jsonWriter.beginObject().name("Payload").value(data.toString()).endObject();
+ }
+ }
+ jsonWriter.endArray();
+ jsonWriter.endObject();
+ } catch (IOException e) {
+ LOG.error("Failed to export journal to {}", path, e);
+ }
+ }
+
+ private static void writeNode(final JsonWriter writer, final DataTreeCandidate candidate) throws IOException {
+ writer.beginObject();
+ writer.name("Entry");
+ writer.beginArray();
+ doWriteNode(writer, candidate.getRootPath(), candidate.getRootNode());
+ writer.endArray();
+ writer.endObject();
+ }
+
+ private static void doWriteNode(final JsonWriter writer, final YangInstanceIdentifier path,
+ final DataTreeCandidateNode node) throws IOException {
+ switch (node.getModificationType()) {
+ case APPEARED:
+ case DISAPPEARED:
+ case SUBTREE_MODIFIED:
+ NodeIterator iterator = new NodeIterator(null, path, node.getChildNodes().iterator());
+ do {
+ iterator = iterator.next(writer);
+ } while (iterator != null);
+ break;
+ case DELETE:
+ case UNMODIFIED:
+ case WRITE:
+ outputNodeInfo(writer, path, node);
+ break;
+ default:
+ outputDefault(writer, path, node);
+ }
+ }
+
+ private static void outputNodeInfo(final JsonWriter writer, final YangInstanceIdentifier path,
+ final DataTreeCandidateNode node) throws IOException {
+ final ModificationType modificationType = node.getModificationType();
+
+ writer.beginObject().name("Node");
+ writer.beginArray();
+ writer.beginObject().name("Path").value(path.toString()).endObject();
+ writer.beginObject().name("ModificationType").value(modificationType.toString()).endObject();
+ if (modificationType == ModificationType.WRITE) {
+ writer.beginObject().name("Data").value(node.getDataAfter().get().body().toString()).endObject();
+ }
+ writer.endArray();
+ writer.endObject();
+ }
+
+ private static void outputDefault(final JsonWriter writer, final YangInstanceIdentifier path,
+ final DataTreeCandidateNode node) throws IOException {
+ writer.beginObject().name("Node");
+ writer.beginArray();
+ writer.beginObject().name("Path").value(path.toString()).endObject();
+ writer.beginObject().name("ModificationType")
+ .value("UNSUPPORTED MODIFICATION: " + node.getModificationType()).endObject();
+ writer.endArray();
+ writer.endObject();
+ }
+
+ private void createDir(final Path path) {
+ try {
+ Files.createDirectories(path);
+ } catch (IOException e) {
+ LOG.warn("Directory {} cannot be created", path, e);
+ }
+ }
+
+ private static final class NodeIterator {
+ private final Iterator<DataTreeCandidateNode> iterator;
+ private final YangInstanceIdentifier path;
+ private final NodeIterator parent;
+
+ NodeIterator(final @Nullable NodeIterator parent, final YangInstanceIdentifier path,
+ final Iterator<DataTreeCandidateNode> iterator) {
+ this.iterator = requireNonNull(iterator);
+ this.path = requireNonNull(path);
+ this.parent = parent;
+ }
+
+ NodeIterator next(final JsonWriter writer) throws IOException {
+ while (iterator.hasNext()) {
+ final DataTreeCandidateNode node = iterator.next();
+ final YangInstanceIdentifier child = path.node(node.getIdentifier());
+
+ switch (node.getModificationType()) {
+ case APPEARED:
+ case DISAPPEARED:
+ case SUBTREE_MODIFIED:
+ return new NodeIterator(this, child, node.getChildNodes().iterator());
+ case DELETE:
+ case UNMODIFIED:
+ case WRITE:
+ outputNodeInfo(writer, path, node);
+ break;
+ default:
+ outputDefault(writer, child, node);
+ }
+ }
+
+ return parent;
+ }
+ }
+}
description "Use lz4 compression for snapshots, sent from leader to follower, for snapshots stored
by LocalSnapshotStore, use akka.conf configuration.";
}
+
+ leaf export-on-recovery {
+ default off;
+ type enumeration {
+ enum off;
+ enum json;
+ }
+ description "Export snapshot and journal during recovery. Possible modes: off(default),
+ json(export to json files). Note that in case of large snapshot,
+ export will take a lot of time.";
+ }
+
+ leaf recovery-export-base-dir {
+ default persistence-export;
+ type string;
+ description "Directory name for snapshot and journal dumps.";
+ }
}
container data-store-properties-container {
public abstract class AbstractShardTest extends AbstractActorTest {
protected static final EffectiveModelContext SCHEMA_CONTEXT = TestModel.createTestContext();
- private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
-
+ protected static final AtomicInteger SHARD_NUM = new AtomicInteger();
protected static final int HEARTBEAT_MILLIS = 100;
- protected final ShardIdentifier shardID = ShardIdentifier.create("inventory", MemberName.forName("member-1"),
- "config" + NEXT_SHARD_NUM.getAndIncrement());
-
protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000)
.shardHeartbeatIntervalInMillis(HEARTBEAT_MILLIS);
protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+ protected final int nextShardNum = SHARD_NUM.getAndIncrement();
+ protected final ShardIdentifier shardID = ShardIdentifier.create("inventory", MemberName.forName("member-1"),
+ "config" + nextShardNum);
@Before
- public void setUp() {
+ public void setUp() throws Exception {
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
}
.schemaContextProvider(() -> SCHEMA_CONTEXT);
}
- protected void testRecovery(final Set<Integer> listEntryKeys) throws Exception {
+ protected void testRecovery(final Set<Integer> listEntryKeys, final boolean stopActorOnFinish) throws Exception {
// Create the actor and wait for recovery complete.
final int nListEntries = listEntryKeys.size();
assertEquals("Last applied", nListEntries,
shard.underlyingActor().getShardMBean().getLastApplied());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ if (stopActorOnFinish) {
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
}
protected void verifyLastApplied(final TestActorRef<Shard> shard, final long expectedValue) {
@Override
@Before
- public void setUp() {
+ public void setUp() throws Exception {
super.setUp();
createShard();
}
import org.junit.Test;
import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
/**
properties.put("mAx-shaRd-data-STORE-executor-quEUe-size", "3333");
properties.put("persistent", "false");
properties.put("initial-payload-serialized-buffer-capacity", "600");
+ properties.put("export-on-recovery", "json");
+ properties.put("recovery-json-dump", "persistence-export");
boolean updated = introspector.update(properties);
assertTrue("updated", updated);
assertEquals(901, context.getShardBatchedModificationCount());
assertEquals(200, context.getTransactionCreationInitialRateLimit());
assertEquals(600, context.getInitialPayloadSerializedBufferCapacity());
+ assertEquals("persistence-export", context.getRecoveryExportBaseDir());
+ assertEquals(ExportOnRecovery.Json, context.getExportOnRecovery());
assertFalse(context.isPersistent());
properties.put("shard-transaction-idle-timeout-in-minutes", "32");
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_MAX_MESSAGE_SLICE_SIZE;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_MS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_PERSISTENT;
+import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_EXPORT_BASE_DIR;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_RECOVERY_SNAPSHOT_INTERVAL_SECONDS;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
import static org.opendaylight.controller.cluster.datastore.DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
public class DatastoreContextTest {
assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT,
context.getShardBatchedModificationCount());
assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE, context.getMaximumMessageSliceSize());
+ assertEquals(DEFAULT_RECOVERY_EXPORT_BASE_DIR, context.getRecoveryExportBaseDir());
}
@Test
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_STORE_EXECUTOR_QUEUE_SIZE + 1);
builder.maximumMessageSliceSize(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1);
builder.initialPayloadSerializedBufferCapacity(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1);
+ builder.exportOnRecovery(ExportOnRecovery.Json);
+ builder.recoveryExportBaseDir(DEFAULT_RECOVERY_EXPORT_BASE_DIR + "-new");
DatastoreContext context = builder.build();
assertEquals(DEFAULT_MAX_MESSAGE_SLICE_SIZE + 1, context.getMaximumMessageSliceSize());
assertEquals(DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY + 1,
context.getInitialPayloadSerializedBufferCapacity());
+ assertEquals(DEFAULT_RECOVERY_EXPORT_BASE_DIR + "-new",
+ context.getRecoveryExportBaseDir());
+ assertEquals(ExportOnRecovery.Json, context.getExportOnRecovery());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class JsonExportTest extends AbstractShardTest {
+ private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
+ + "InMemorySnapshotStore and journal recovery seq number will start from 1";
+ private static final String EXPECTED_JOURNAL_FILE = "expectedJournalExport.json";
+ private static final String EXPECTED_SNAPSHOT_FILE = "expectedSnapshotExport.json";
+ private static String actualJournalFilePath;
+ private static String actualSnapshotFilePath;
+ private DatastoreContext datastoreContext;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final File exportTmpFolder = temporaryFolder.newFolder("persistence-export");
+ actualJournalFilePath = exportTmpFolder.getAbsolutePath() + "/journals/"
+ + "member-1-shard-inventory-config" + nextShardNum + "-journal.json";
+ actualSnapshotFilePath = exportTmpFolder.getAbsolutePath() + "/snapshots/"
+ + "member-1-shard-inventory-config" + nextShardNum + "-snapshot.json";
+ datastoreContext = DatastoreContext.newBuilder().shardJournalRecoveryLogBatchSize(1)
+ .shardSnapshotBatchCount(5000).shardHeartbeatIntervalInMillis(HEARTBEAT_MILLIS).persistent(true)
+ .exportOnRecovery(ExportOnRecovery.Json)
+ .recoveryExportBaseDir(exportTmpFolder.getAbsolutePath()).build();
+ }
+
+ @Override
+ protected DatastoreContext newDatastoreContext() {
+ return datastoreContext;
+ }
+
+ @Test
+ public void testJsonExport() throws Exception {
+ // Set up the InMemorySnapshotStore.
+ final DataTree source = setupInMemorySnapshotStore();
+
+ final DataTreeModification writeMod = source.takeSnapshot().newModification();
+ writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ writeMod.ready();
+ InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
+ // Set up the InMemoryJournal.
+ InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
+ payloadForModification(source, writeMod, nextTransactionId())));
+
+ final int nListEntries = 16;
+ final Set<Integer> listEntryKeys = new HashSet<>();
+
+ // Add some ModificationPayload entries
+ for (int i = 1; i <= nListEntries; i++) {
+ listEntryKeys.add(i);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+ final DataTreeModification mod = source.takeSnapshot().newModification();
+ mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+ mod.ready();
+
+ InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
+ payloadForModification(source, mod, nextTransactionId())));
+ }
+
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
+ new ApplyJournalEntries(nListEntries));
+
+ testRecovery(listEntryKeys, false);
+
+ verifyJournalExport();
+ verifySnapshotExport();
+ }
+
+ private static void verifyJournalExport() throws IOException {
+ final String expectedJournalData = readExpectedFile(EXPECTED_JOURNAL_FILE);
+ final String actualJournalData = readActualFile(actualJournalFilePath);
+ assertEquals("Exported journal is not expected ", expectedJournalData, actualJournalData);
+ }
+
+ private static void verifySnapshotExport() throws IOException {
+ final String expectedSnapshotData = readExpectedFile(EXPECTED_SNAPSHOT_FILE);
+ final String actualSnapshotData = readActualFile(actualSnapshotFilePath);
+ assertEquals("Exported snapshot is not expected ", expectedSnapshotData, actualSnapshotData);
+ }
+
+ private static String readExpectedFile(final String filePath) throws IOException {
+ final File exportFile = new File(JsonExportTest.class.getClassLoader().getResource(filePath).getFile());
+ return new String(Files.readAllBytes(Path.of(exportFile.getPath())));
+ }
+
+ private static String readActualFile(final String filePath) throws IOException {
+ final File exportFile = new File(filePath);
+ await().atMost(10, TimeUnit.SECONDS).until(exportFile::exists);
+ return new String(Files.readAllBytes(Path.of(filePath)));
+ }
+}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
new ApplyJournalEntries(nListEntries));
- testRecovery(listEntryKeys);
+ testRecovery(listEntryKeys, true);
}
@Test
--- /dev/null
+{"Entries":[{"Entry":[{"Node":[{"Path":"/"},{"ModificationType":"UNMODIFIED"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=1}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=2}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=3}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=4}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=5}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=6}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=7}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=8}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=9}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=10}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=11}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=12}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=13}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=14}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=15}]"}]}]},{"Entry":[{"Node":[{"Path":"/(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)test/outer-list"},{"ModificationType":"WRITE"},{"Data":"[ImmutableLeafNode{identifier=(urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test?revision=2014-03-13)id, body=16}]"}]}]}]}
\ No newline at end of file
--- /dev/null
+{"odl-datastore-test:test":{}}
\ No newline at end of file