From bb248f15d352cdd69e53ff7756fcb2c62cdc3eac Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tibor=20Kr=C3=A1l?= Date: Fri, 21 Feb 2020 14:57:41 +0100 Subject: [PATCH] Add an option to trigger snapshot creation on root overwrites MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit In some cases (such as DAEXIM import), it does not necessarily make sense to retain previous data in the journal, as all of it has been superseded. JIRA: CONTROLLER-1913 Change-Id: I5d634faac06e6764a417c23e88c728373b900924 Signed-off-by: Tibor Král Signed-off-by: Tomas Cere --- .../controller/cluster/raft/RaftActor.java | 2 +- .../raft/RaftActorSnapshotMessageSupport.java | 2 +- .../cluster/raft/SnapshotManager.java | 36 +++++++-- .../cluster/raft/SnapshotState.java | 8 ++ .../raft/base/messages/CaptureSnapshot.java | 11 ++- .../src/main/resources/initial/datastore.cfg | 3 + .../cluster/datastore/DatastoreContext.java | 12 +++ .../controller/cluster/datastore/Shard.java | 6 +- .../cluster/datastore/ShardDataTree.java | 32 +++++++- .../yang/distributed-datastore-provider.yang | 6 ++ ...ctDistributedDataStoreIntegrationTest.java | 62 +++++++++++++++ ...butedDataStoreRemotingIntegrationTest.java | 79 ++++++++++++++++++- .../module-shards-default-cars-member1.conf | 24 ++++++ 13 files changed, 266 insertions(+), 17 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 494ec98b8f..7148986877 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -902,7 +902,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 3b4c08c405..0fe9a1acc3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -110,7 +110,7 @@ class RaftActorSnapshotMessageSupport { if (context.getPersistenceProvider().isRecoveryApplicable()) { CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot( - context.getReplicatedLog().last(), -1); + context.getReplicatedLog().last(), -1, true); ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot, ImmutableElectionTerm.copyOf(context.getTermInformation()), sender, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 7d9a1bbf5b..71803ccf61 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -91,6 +91,11 @@ public class SnapshotManager implements SnapshotState { return currentState.capture(lastLogEntry, replicatedToAllIndex); } + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex); + } + @Override public void apply(final ApplySnapshot snapshot) { currentState.apply(snapshot); @@ -154,7 +159,8 @@ public class SnapshotManager implements SnapshotState { * @param replicatedToAllIndex the index of the last entry replicated to all followers. * @return a new CaptureSnapshot instance. */ - public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { + public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, + final boolean mandatoryTrim) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry, hasFollowers()); @@ -186,7 +192,7 @@ public class SnapshotManager implements SnapshotState { } return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim); } private class AbstractSnapshotState implements SnapshotState { @@ -209,6 +215,12 @@ public class SnapshotManager implements SnapshotState { return false; } + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + log.debug("captureWithForcedTrim should not be called in state {}", this); + return false; + } + @Override public void apply(final ApplySnapshot snapshot) { log.debug("apply should not be called in state {}", this); @@ -279,8 +291,8 @@ public class SnapshotManager implements SnapshotState { @SuppressWarnings("checkstyle:IllegalCatch") private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, - final String targetFollower) { - captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex); + final String targetFollower, boolean mandatoryTrim) { + captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim); OutputStream installSnapshotStream = null; if (targetFollower != null) { @@ -310,13 +322,18 @@ public class SnapshotManager implements SnapshotState { @Override public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) { - return capture(lastLogEntry, replicatedToAllIndex, null); + return capture(lastLogEntry, replicatedToAllIndex, null, false); } @Override public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex, final String targetFollower) { - return capture(lastLogEntry, replicatedToAllIndex, targetFollower); + return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false); + } + + @Override + public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + return capture(lastLogEntry, replicatedToAllIndex, null, true); } @Override @@ -369,17 +386,20 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { + if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) { if (log.isDebugEnabled()) { if (dataSizeThresholdExceeded) { log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit " + "with index {}", context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, captureSnapshot.getLastAppliedIndex()); - } else { + } else if (logSizeExceededSnapshotBatchCount) { log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with " + "index {}", context.getId(), context.getReplicatedLog().size(), context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + } else { + log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to" + + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 0a702741d8..acb6e01230 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -46,6 +46,14 @@ public interface SnapshotState { */ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); + /** + * Initiates a capture snapshot, while enforcing trimming of the log up to lastAppliedIndex. + * @param lastLogEntry the last entry in the replicated log + * @param replicatedToAllIndex the current replicatedToAllIndex + * @return true if capture was started + */ + boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + /** * Applies a snapshot on a follower that was installed by the leader. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index b06f8f295e..0fd48edf81 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -21,10 +21,11 @@ public class CaptureSnapshot implements ControlMessage { private final long replicatedToAllIndex; private final long replicatedToAllTerm; private final List unAppliedEntries; + private final boolean mandatoryTrim; public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, - List unAppliedEntries) { + List unAppliedEntries, boolean mandatoryTrim) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; @@ -33,6 +34,7 @@ public class CaptureSnapshot implements ControlMessage { this.replicatedToAllTerm = replicatedToAllTerm; this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.emptyList(); + this.mandatoryTrim = mandatoryTrim; } public long getLastAppliedIndex() { @@ -63,6 +65,10 @@ public class CaptureSnapshot implements ControlMessage { return unAppliedEntries; } + public boolean isMandatoryTrim() { + return mandatoryTrim; + } + @Override public String toString() { return "CaptureSnapshot [lastAppliedIndex=" + lastAppliedIndex @@ -72,7 +78,8 @@ public class CaptureSnapshot implements ControlMessage { + ", installSnapshotInitiated=" + ", replicatedToAllIndex=" + replicatedToAllIndex + ", replicatedToAllTerm=" + replicatedToAllTerm - + ", unAppliedEntries size=" + unAppliedEntries.size() + "]"; + + ", unAppliedEntries size=" + unAppliedEntries.size() + + ", mandatoryTrim=" + mandatoryTrim + "]"; } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index 608ea328c5..f1374b11d7 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -120,3 +120,6 @@ operational.persistent=false #Interval after which a snapshot should be taken during the recovery process. #recovery-snapshot-interval-seconds=0 + +# Option to take a snapshot when the entire DataTree root or top-level container is overwritten +snapshot-on-root-overwrite=false \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 4bd3974557..43af39ff31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -53,6 +53,7 @@ public class DatastoreContext implements ClientActorConfig { public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3; public static final boolean DEFAULT_PERSISTENT = true; + public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false; public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader(); public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; @@ -82,6 +83,7 @@ public class DatastoreContext implements ClientActorConfig { private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER; private boolean persistent = DEFAULT_PERSISTENT; + private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE; private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; @@ -126,6 +128,7 @@ public class DatastoreContext implements ClientActorConfig { this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout; this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier; this.persistent = other.persistent; + this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite; this.configurationReader = other.configurationReader; this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; this.dataStoreName = other.dataStoreName; @@ -213,6 +216,10 @@ public class DatastoreContext implements ClientActorConfig { return persistent; } + public boolean isSnapshotOnRootOverwrite() { + return this.snapshotOnRootOverwrite; + } + public AkkaConfigurationReader getConfigurationReader() { return configurationReader; } @@ -481,6 +488,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) { + datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite; + return this; + } + public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) { datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fef676a601..e051953f95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -223,10 +223,12 @@ public class Shard extends RaftActor { new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, + frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, + frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 450de78c67..b2549eaf4d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static akka.actor.ActorRef.noSender; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; @@ -67,6 +68,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; @@ -85,6 +87,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @@ -387,7 +390,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private void applyReplicatedCandidate(final CommitTransactionPayload payload) throws DataValidationFailedException, IOException { - final Entry entry = payload.acquireCandidate(); + final Entry entry = payload.getCandidate(); final TransactionIdentifier identifier = entry.getKey(); LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); @@ -439,6 +442,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { applyReplicatedCandidate((CommitTransactionPayload) payload); } } + + // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed. + checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue() + .getCandidate()); } else if (payload instanceof AbortTransactionPayload) { if (identifier != null) { payloadReplicationComplete((AbortTransactionPayload) payload); @@ -469,6 +476,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } + private void checkRootOverwrite(DataTreeCandidate candidate) { + final DatastoreContext datastoreContext = shard.getDatastoreContext(); + if (!datastoreContext.isSnapshotOnRootOverwrite()) { + return; + } + + if (!datastoreContext.isPersistent()) { + return; + } + + if (candidate.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) { + return; + } + + // top level container ie "/" + if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty()) + && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) { + LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext); + shard.self().tell(new InitiateCaptureSnapshot(), noSender()); + return; + } + } + private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) { if (callback != null) { replicationCallbacks.put(payload, callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index e2e146c7c5..acb22256ce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -184,6 +184,12 @@ module distributed-datastore-provider { description "Enable or disable data persistence"; } + leaf snapshotOnRootOverwrite { + default false; + type boolean; + description "Enable or disable capturing snapshots on DataTree root overwrites"; + } + leaf shard-isolated-leader-check-interval-in-millis { default 5000; type heartbeat-interval-type; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index a6b4d912cf..f530d673a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -56,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -84,6 +86,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguratio import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; public abstract class AbstractDistributedDataStoreIntegrationTest { @@ -906,4 +909,63 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { assertEquals("Data node", peopleNode, optional.get()); } } + + @Test + @Ignore("Writes to root node are not split into shards") + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), + datastoreContextBuilder.snapshotOnRootOverwrite(true)); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf", + true, "cars", "default")) { + + ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild((ContainerNode) CarsModel.create()) + .build(); + + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + // root has been written expect snapshot at index 0 + verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1); + + for (int i = 0; i < 10; i++) { + testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 0 + verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1); + + // root overwrite so expect a snapshot + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1 + 10 + 1) + IntegrationTestKit.verifyShardState(dataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testRootOverwrite", 11, 1); + } + } + + private void verifySnapshot(String persistenceId, long lastAppliedIndex, long lastAppliedTerm) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm()); + } + ); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0c74c71d83..8637a5f17d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -124,6 +124,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -221,12 +222,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); @@ -1435,6 +1442,74 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { executor.shutdownNow(); } + @Test + @Ignore("Writes to root node are not split into shards") + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final String testName = "testSnapshotOnRootOverwrite"; + String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1.conf", shards, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild((ContainerNode) CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(0, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(9, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 0 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(11, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11); + } + + private void verifySnapshot(String persistenceId, long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf new file mode 100644 index 0000000000..109ff58f7e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf @@ -0,0 +1,24 @@ +module-shards = [ + { + name = "default" + shards = [ + { + name="default", + replicas = [ + "member-1", + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1" + ] + } + ] + } +] \ No newline at end of file -- 2.36.6