From: Tibor Král Date: Fri, 21 Feb 2020 13:57:41 +0000 (+0100) Subject: Add an option to trigger snapshot creation on root overwrites X-Git-Tag: v2.0.3~22 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=bb248f15d352cdd69e53ff7756fcb2c62cdc3eac Add an option to trigger snapshot creation on root overwrites In some cases (such as DAEXIM import), it does not necessarily make sense to retain previous data in the journal, as all of it has been superseded. JIRA: CONTROLLER-1913 Change-Id: I5d634faac06e6764a417c23e88c728373b900924 Signed-off-by: Tibor Král Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 494ec98b8f..7148986877 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -902,7 +902,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 3b4c08c405..0fe9a1acc3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -110,7 +110,7 @@ class RaftActorSnapshotMessageSupport { if (context.getPersistenceProvider().isRecoveryApplicable()) { CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot( - context.getReplicatedLog().last(), -1); + context.getReplicatedLog().last(), -1, true); ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 7d9a1bbf5b..71803ccf61 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -91,6 +91,11 @@ public class SnapshotManager implements SnapshotState { return currentState.capture(lastLogEntry, replicatedToAllIndex); } + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex); + } + @Override public void apply(final ApplySnapshot snapshot) { currentState.apply(snapshot); @@ -154,7 +159,8 @@ public class SnapshotManager implements SnapshotState { * @param replicatedToAllIndex the index of the last entry replicated to all followers. * @return a new CaptureSnapshot instance. */ - public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + final boolean mandatoryTrim) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry, hasFollowers()); @@ -186,7 +192,7 @@ public class SnapshotManager implements SnapshotState { } return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim); } private class AbstractSnapshotState implements SnapshotState { @@ -209,6 +215,12 @@ public class SnapshotManager implements SnapshotState { return false; } + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + log.debug("captureWithForcedTrim should not be called in state {}", this); + return false; + } + @Override public void apply(final ApplySnapshot snapshot) { log.debug("apply should not be called in state {}", this); @@ -279,8 +291,8 @@ public class SnapshotManager implements SnapshotState { @SuppressWarnings("checkstyle:IllegalCatch") private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, - final String targetFollower) { - captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex); + final String targetFollower, boolean mandatoryTrim) { + captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim); OutputStream installSnapshotStream = null; if (targetFollower != null) { @@ -310,13 +322,18 @@ public class SnapshotManager implements SnapshotState { @Override public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { - return capture(lastLogEntry, replicatedToAllIndex, null); + return capture(lastLogEntry, replicatedToAllIndex, null, false); } @Override public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { - return capture(lastLogEntry, replicatedToAllIndex, targetFollower); + return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false); + } + + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null, true); } @Override @@ -369,17 +386,20 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { + if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) { if (log.isDebugEnabled()) { if (dataSizeThresholdExceeded) { log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit " + "with index {}", context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, captureSnapshot.getLastAppliedIndex()); - } else { + } else if (logSizeExceededSnapshotBatchCount) { log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with " + "index {}", context.getId(), context.getReplicatedLog().size(), context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + } else { + log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to" + + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 0a702741d8..acb6e01230 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -46,6 +46,14 @@ public interface SnapshotState { */ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); + /** + * Initiates a capture snapshot, while enforcing trimming of the log up to lastAppliedIndex. + * @param lastLogEntry the last entry in the replicated log + * @param replicatedToAllIndex the current replicatedToAllIndex + * @return true if capture was started + */ + boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + /** * Applies a snapshot on a follower that was installed by the leader. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index b06f8f295e..0fd48edf81 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -21,10 +21,11 @@ public class CaptureSnapshot implements ControlMessage { private final long replicatedToAllIndex; private final long replicatedToAllTerm; private final List unAppliedEntries; + private final boolean mandatoryTrim; public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, - List unAppliedEntries) { + List unAppliedEntries, boolean mandatoryTrim) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; @@ -33,6 +34,7 @@ public class CaptureSnapshot implements ControlMessage { this.replicatedToAllTerm = replicatedToAllTerm; this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.emptyList(); + this.mandatoryTrim = mandatoryTrim; } public long getLastAppliedIndex() { @@ -63,6 +65,10 @@ public class CaptureSnapshot implements ControlMessage { return unAppliedEntries; } + public boolean isMandatoryTrim() { + return mandatoryTrim; + } + @Override public String toString() { return "CaptureSnapshot [lastAppliedIndex=" + lastAppliedIndex @@ -72,7 +78,8 @@ public class CaptureSnapshot implements ControlMessage { + ", installSnapshotInitiated=" + ", replicatedToAllIndex=" + replicatedToAllIndex + ", replicatedToAllTerm=" + replicatedToAllTerm - + ", unAppliedEntries size=" + unAppliedEntries.size() + "]"; + + ", unAppliedEntries size=" + unAppliedEntries.size() + + ", mandatoryTrim=" + mandatoryTrim + "]"; } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 608ea328c5..f1374b11d7 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -120,3 +120,6 @@ operational.persistent=false #Interval after which a snapshot should be taken during the recovery process. #recovery-snapshot-interval-seconds=0 + +# Option to take a snapshot when the entire DataTree root or top-level container is overwritten +snapshot-on-root-overwrite=false \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 4bd3974557..43af39ff31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -53,6 +53,7 @@ public class DatastoreContext implements ClientActorConfig { public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3; public static final boolean DEFAULT_PERSISTENT = true; + public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false; public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; @@ -82,6 +83,7 @@ public class DatastoreContext implements ClientActorConfig { private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER; private boolean persistent = DEFAULT_PERSISTENT; + private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; @@ -126,6 +128,7 @@ public class DatastoreContext implements ClientActorConfig { this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier; this.persistent = other.persistent; + this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; this.dataStoreName = other.dataStoreName; @@ -213,6 +216,10 @@ public class DatastoreContext implements ClientActorConfig { return persistent; } + public boolean isSnapshotOnRootOverwrite() { + return this.snapshotOnRootOverwrite; + } + public AkkaConfigurationReader getConfigurationReader() { return configurationReader; } @@ -481,6 +488,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) { + datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite; + return this; + } + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fef676a601..e051953f95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -223,10 +223,12 @@ public class Shard extends RaftActor { new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, + frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, + frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 450de78c67..b2549eaf4d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; @@ -67,6 +68,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; @@ -85,6 +87,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @@ -387,7 +390,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.acquireCandidate(); + final Entry entry = payload.getCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -439,6 +442,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } } + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue() + .getCandidate()); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -469,6 +476,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void checkRootOverwrite(DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + return; + } + + if (candidate.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) { + return; + } + + // top level container ie "/" + if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty()) + && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + return; + } + } + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index e2e146c7c5..acb22256ce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -184,6 +184,12 @@ module distributed-datastore-provider { description "Enable or disable data persistence"; } + leaf snapshotOnRootOverwrite { + default false; + type boolean; + description "Enable or disable capturing snapshots on DataTree root overwrites"; + } + leaf shard-isolated-leader-check-interval-in-millis { default 5000; type heartbeat-interval-type; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index a6b4d912cf..f530d673a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -56,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -84,6 +86,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguratio import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; public abstract class AbstractDistributedDataStoreIntegrationTest { @@ -906,4 +909,63 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { assertEquals("Data node", peopleNode, optional.get()); } } + + @Test + @Ignore("Writes to root node are not split into shards") + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), + datastoreContextBuilder.snapshotOnRootOverwrite(true)); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf", + true, "cars", "default")) { + + ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild((ContainerNode) CarsModel.create()) + .build(); + + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + // root has been written expect snapshot at index 0 + verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1); + + for (int i = 0; i < 10; i++) { + testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 0 + verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1); + + // root overwrite so expect a snapshot + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1 + 10 + 1) + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testRootOverwrite", 11, 1); + } + } + + private void verifySnapshot(String persistenceId, long lastAppliedIndex, long lastAppliedTerm) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm()); + } + ); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0c74c71d83..8637a5f17d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -124,6 +124,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -221,12 +222,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); @@ -1435,6 +1442,74 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { executor.shutdownNow(); } + @Test + @Ignore("Writes to root node are not split into shards") + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final String testName = "testSnapshotOnRootOverwrite"; + String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1.conf", shards, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild((ContainerNode) CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 0 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11); + } + + private void verifySnapshot(String persistenceId, long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf new file mode 100644 index 0000000000..109ff58f7e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf @@ -0,0 +1,24 @@ +module-shards = [ + { + name = "default" + shards = [ + { + name="default", + replicas = [ + "member-1", + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1" + ] + } + ] + } +] \ No newline at end of file