Add an option to trigger snapshot creation on root overwrites 43/87943/17
authorTibor Král <tibor.kral@pantheon.tech>
Fri, 21 Feb 2020 13:57:41 +0000 (14:57 +0100)
committerTomas Cere <tomas.cere@pantheon.tech>
Wed, 24 Jun 2020 12:35:00 +0000 (14:35 +0200)
In some cases (such as DAEXIM import), it does not necessarily
make sense to retain previous data in the journal, as all of it
has been superseded.

JIRA: CONTROLLER-1913
Change-Id: I5d634faac06e6764a417c23e88c728373b900924
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf [new file with mode: 0644]

index 494ec98..7148986 100644 (file)
@@ -902,7 +902,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
                 replicatedLog().last(), idx);
 
-            snapshotManager.capture(replicatedLog().last(), idx);
+            snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
         }
     }
 
index 3b4c08c..0fe9a1a 100644 (file)
@@ -110,7 +110,7 @@ class RaftActorSnapshotMessageSupport {
 
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
-                    context.getReplicatedLog().last(), -1);
+                    context.getReplicatedLog().last(), -1, true);
 
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
                     ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
index 7d9a1bb..71803cc 100644 (file)
@@ -91,6 +91,11 @@ public class SnapshotManager implements SnapshotState {
         return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
+    @Override
+    public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex);
+    }
+
     @Override
     public void apply(final ApplySnapshot snapshot) {
         currentState.apply(snapshot);
@@ -154,7 +159,8 @@ public class SnapshotManager implements SnapshotState {
      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
      * @return a new CaptureSnapshot instance.
      */
-    public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
+    public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
+                                              final boolean mandatoryTrim) {
         TermInformationReader lastAppliedTermInfoReader =
                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                         lastLogEntry, hasFollowers());
@@ -186,7 +192,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim);
     }
 
     private class AbstractSnapshotState implements SnapshotState {
@@ -209,6 +215,12 @@ public class SnapshotManager implements SnapshotState {
             return false;
         }
 
+        @Override
+        public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            log.debug("captureWithForcedTrim should not be called in state {}", this);
+            return false;
+        }
+
         @Override
         public void apply(final ApplySnapshot snapshot) {
             log.debug("apply should not be called in state {}", this);
@@ -279,8 +291,8 @@ public class SnapshotManager implements SnapshotState {
 
         @SuppressWarnings("checkstyle:IllegalCatch")
         private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
-                final String targetFollower) {
-            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
+                final String targetFollower, boolean mandatoryTrim) {
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim);
 
             OutputStream installSnapshotStream = null;
             if (targetFollower != null) {
@@ -310,13 +322,18 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
-            return capture(lastLogEntry, replicatedToAllIndex, null);
+            return capture(lastLogEntry, replicatedToAllIndex, null, false);
         }
 
         @Override
         public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
                 final String targetFollower) {
-            return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+            return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false);
+        }
+
+        @Override
+        public boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            return capture(lastLogEntry, replicatedToAllIndex, null, true);
         }
 
         @Override
@@ -369,17 +386,20 @@ public class SnapshotManager implements SnapshotState {
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
 
             final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-            if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
+            if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) {
                 if (log.isDebugEnabled()) {
                     if (dataSizeThresholdExceeded) {
                         log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
                                 + "with index {}", context.getId(), context.getReplicatedLog().dataSize(),
                                 dataThreshold, captureSnapshot.getLastAppliedIndex());
-                    } else {
+                    } else if (logSizeExceededSnapshotBatchCount) {
                         log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
                                 + "index {}", context.getId(), context.getReplicatedLog().size(),
                                 context.getConfigParams().getSnapshotBatchCount(),
                                 captureSnapshot.getLastAppliedIndex());
+                    } else {
+                        log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to"
+                                + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex());
                     }
                 }
 
index 0a70274..acb6e01 100644 (file)
@@ -46,6 +46,14 @@ public interface SnapshotState {
      */
     boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
+    /**
+     * Initiates a capture snapshot, while enforcing trimming of the log up to lastAppliedIndex.
+     * @param lastLogEntry the last entry in the replicated log
+     * @param replicatedToAllIndex the current replicatedToAllIndex
+     * @return true if capture was started
+     */
+    boolean captureWithForcedTrim(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
     /**
      * Applies a snapshot on a follower that was installed by the leader.
      *
index b06f8f2..0fd48ed 100644 (file)
@@ -21,10 +21,11 @@ public class CaptureSnapshot implements ControlMessage {
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
     private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final boolean mandatoryTrim;
 
     public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
             long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
-            List<ReplicatedLogEntry> unAppliedEntries) {
+            List<ReplicatedLogEntry> unAppliedEntries, boolean mandatoryTrim) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
@@ -33,6 +34,7 @@ public class CaptureSnapshot implements ControlMessage {
         this.replicatedToAllTerm = replicatedToAllTerm;
         this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries :
             Collections.<ReplicatedLogEntry>emptyList();
+        this.mandatoryTrim = mandatoryTrim;
     }
 
     public long getLastAppliedIndex() {
@@ -63,6 +65,10 @@ public class CaptureSnapshot implements ControlMessage {
         return unAppliedEntries;
     }
 
+    public boolean isMandatoryTrim() {
+        return mandatoryTrim;
+    }
+
     @Override
     public String toString() {
         return "CaptureSnapshot [lastAppliedIndex=" + lastAppliedIndex
@@ -72,7 +78,8 @@ public class CaptureSnapshot implements ControlMessage {
                 + ", installSnapshotInitiated="
                 + ", replicatedToAllIndex=" + replicatedToAllIndex
                 + ", replicatedToAllTerm=" + replicatedToAllTerm
-                + ", unAppliedEntries size=" + unAppliedEntries.size() + "]";
+                + ", unAppliedEntries size=" + unAppliedEntries.size()
+                + ", mandatoryTrim=" + mandatoryTrim + "]";
     }
 
 
index 608ea32..f1374b1 100644 (file)
@@ -120,3 +120,6 @@ operational.persistent=false
 
 #Interval after which a snapshot should be taken during the recovery process.
 #recovery-snapshot-interval-seconds=0
+
+# Option to take a snapshot when the entire DataTree root or top-level container is overwritten
+snapshot-on-root-overwrite=false
\ No newline at end of file
index 4bd3974..43af39f 100644 (file)
@@ -53,6 +53,7 @@ public class DatastoreContext implements ClientActorConfig {
     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
     public static final int DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER = 3;
     public static final boolean DEFAULT_PERSISTENT = true;
+    public static final boolean DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE = false;
     public static final FileAkkaConfigurationReader DEFAULT_CONFIGURATION_READER = new FileAkkaConfigurationReader();
     public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
@@ -82,6 +83,7 @@ public class DatastoreContext implements ClientActorConfig {
     private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
     private int initialSettleTimeoutMultiplier = DEFAULT_INITIAL_SETTLE_TIMEOUT_MULTIPLIER;
     private boolean persistent = DEFAULT_PERSISTENT;
+    private boolean snapshotOnRootOverwrite = DEFAULT_SNAPSHOT_ON_ROOT_OVERWRITE;
     private AkkaConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
     private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
@@ -126,6 +128,7 @@ public class DatastoreContext implements ClientActorConfig {
         this.shardLeaderElectionTimeout = other.shardLeaderElectionTimeout;
         this.initialSettleTimeoutMultiplier = other.initialSettleTimeoutMultiplier;
         this.persistent = other.persistent;
+        this.snapshotOnRootOverwrite = other.snapshotOnRootOverwrite;
         this.configurationReader = other.configurationReader;
         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
         this.dataStoreName = other.dataStoreName;
@@ -213,6 +216,10 @@ public class DatastoreContext implements ClientActorConfig {
         return persistent;
     }
 
+    public boolean isSnapshotOnRootOverwrite() {
+        return this.snapshotOnRootOverwrite;
+    }
+
     public AkkaConfigurationReader getConfigurationReader() {
         return configurationReader;
     }
@@ -481,6 +488,11 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder snapshotOnRootOverwrite(final boolean snapshotOnRootOverwrite) {
+            datastoreContext.snapshotOnRootOverwrite = snapshotOnRootOverwrite;
+            return this;
+        }
+
         public Builder shardIsolatedLeaderCheckIntervalInMillis(final int shardIsolatedLeaderCheckIntervalInMillis) {
             datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
index fef676a..e051953 100644 (file)
@@ -223,10 +223,12 @@ public class Shard extends RaftActor {
                 new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, name, frontendMetadata);
+                    treeChangeListenerPublisher, name,
+                    frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name,
+                    frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
index 450de78..b2549ea 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.actor.ActorRef.noSender;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
@@ -67,6 +68,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
@@ -85,6 +87,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
@@ -387,7 +390,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private void applyReplicatedCandidate(final CommitTransactionPayload payload)
             throws DataValidationFailedException, IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
+        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
         final TransactionIdentifier identifier = entry.getKey();
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -439,6 +442,10 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     applyReplicatedCandidate((CommitTransactionPayload) payload);
                 }
             }
+
+            // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
+            checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
+                    .getCandidate());
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -469,6 +476,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    private void checkRootOverwrite(DataTreeCandidate candidate) {
+        final DatastoreContext datastoreContext = shard.getDatastoreContext();
+        if (!datastoreContext.isSnapshotOnRootOverwrite()) {
+            return;
+        }
+
+        if (!datastoreContext.isPersistent()) {
+            return;
+        }
+
+        if (candidate.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
+            return;
+        }
+
+        // top level container ie "/"
+        if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty())
+                && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) {
+            LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
+            shard.self().tell(new InitiateCaptureSnapshot(), noSender());
+            return;
+        }
+    }
+
     private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
         if (callback != null) {
             replicationCallbacks.put(payload, callback);
index e2e146c..acb2225 100644 (file)
@@ -184,6 +184,12 @@ module distributed-datastore-provider {
             description "Enable or disable data persistence";
         }
 
+        leaf snapshotOnRootOverwrite {
+            default false;
+            type boolean;
+            description "Enable or disable capturing snapshots on DataTree root overwrites";
+        }
+
         leaf shard-isolated-leader-check-interval-in-millis {
             default 5000;
             type heartbeat-interval-type;
index a6b4d91..f530d67 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -56,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -84,6 +86,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguratio
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public abstract class AbstractDistributedDataStoreIntegrationTest {
 
@@ -906,4 +909,63 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             assertEquals("Data node", peopleNode, optional.get());
         }
     }
+
+    @Test
+    @Ignore("Writes to root node are not split into shards")
+    public void testSnapshotOnRootOverwrite() throws Exception {
+        if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
+            // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
+            return;
+        }
+
+        final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
+                datastoreContextBuilder.snapshotOnRootOverwrite(true));
+        try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
+                testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
+                true, "cars", "default")) {
+
+            ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
+                    .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+                    .withChild((ContainerNode) CarsModel.create())
+                    .build();
+
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(0, state.getSnapshotIndex()));
+
+            // root has been written expect snapshot at index 0
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1);
+
+            for (int i = 0; i < 10; i++) {
+                testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
+                    CarsModel.newCarEntry("car " + i, Uint64.ONE));
+            }
+
+            // fake snapshot causes the snapshotIndex to move
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(9, state.getSnapshotIndex()));
+
+            // however the real snapshot still has not changed and was taken at index 0
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 0, 1);
+
+            // root overwrite so expect a snapshot
+            testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+
+            // this was a real snapshot so everything should be in it(1 + 10 + 1)
+            IntegrationTestKit.verifyShardState(dataStore, "cars",
+                state -> assertEquals(11, state.getSnapshotIndex()));
+
+            verifySnapshot("member-1-shard-cars-testRootOverwrite", 11, 1);
+        }
+    }
+
+    private void verifySnapshot(String persistenceId, long lastAppliedIndex, long lastAppliedTerm) {
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+                assertEquals(1, snap.size());
+                assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
+                assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
+            }
+        );
+    }
 }
index 0c74c71..8637a5f 100644 (file)
@@ -124,6 +124,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -221,12 +222,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
             throws Exception {
-        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
+        initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder,
+                followerDatastoreContextBuilder);
+    }
+
+    private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards,
+            DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
 
         leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
+        followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
@@ -1435,6 +1442,74 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         executor.shutdownNow();
     }
 
+    @Test
+    @Ignore("Writes to root node are not split into shards")
+    public void testSnapshotOnRootOverwrite() throws Exception {
+        if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
+            // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
+            return;
+        }
+
+        final String testName = "testSnapshotOnRootOverwrite";
+        String[] shards = {"cars", "default"};
+        initDatastores(testName, "module-shards-default-cars-member1.conf", shards,
+                leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
+                followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
+
+        leaderTestKit.waitForMembersUp("member-2");
+        ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+                .withChild((ContainerNode) CarsModel.create())
+                .build();
+
+        leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
+
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(0, state.getSnapshotIndex()));
+
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(0, state.getSnapshotIndex()));
+
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
+
+        for (int i = 0; i < 10; i++) {
+            leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i),
+                    CarsModel.newCarEntry("car " + i, Uint64.ONE));
+        }
+
+        // fake snapshot causes the snapshotIndex to move
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(9, state.getSnapshotIndex()));
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(9, state.getSnapshotIndex()));
+
+        // however the real snapshot still has not changed and was taken at index 0
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
+
+        // root overwrite so expect a snapshot
+        leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
+
+        // this was a real snapshot so everything should be in it(1 + 10 + 1)
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
+            state -> assertEquals(11, state.getSnapshotIndex()));
+        IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
+            state -> assertEquals(11, state.getSnapshotIndex()));
+
+        verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11);
+        verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11);
+    }
+
+    private void verifySnapshot(String persistenceId, long lastAppliedIndex) {
+        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+                assertEquals(1, snap.size());
+                assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
+            }
+        );
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-cars-member1.conf
new file mode 100644 (file)
index 0000000..109ff58
--- /dev/null
@@ -0,0 +1,24 @@
+module-shards = [
+    {
+        name = "default"
+        shards = [
+            {
+                name="default",
+                replicas = [
+                    "member-1",
+                ]
+            }
+        ]
+    },
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    }
+]
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.