import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
private static final String[] CARS = {"cars"};
private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
- "akka.tcp://cluster-test@127.0.0.1:2558");
+ "akka://cluster-test@127.0.0.1:2558");
private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
- "akka.tcp://cluster-test@127.0.0.1:2559");
+ "akka://cluster-test@127.0.0.1:2559");
private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
private final TransactionIdentifier tx1 = nextTransactionId();
private final TransactionIdentifier tx2 = nextTransactionId();
- private DistributedDataStore followerDistributedDataStore;
- private DistributedDataStore leaderDistributedDataStore;
+ private AbstractDataStore followerDistributedDataStore;
+ private AbstractDataStore leaderDistributedDataStore;
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
@Before
public void setUp() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
JavaTestKit.shutdownActorSystem(follower2System);
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
- private void initDatastoresWithCars(String type) {
+ private void initDatastoresWithCars(final String type) {
initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
}
- private void initDatastoresWithCarsAndPeople(String type) {
+ private void initDatastoresWithCarsAndPeople(final String type) {
initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
}
- private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
+ private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
}
- private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
+ private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
+ throws Exception {
Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, optional.isPresent());
assertEquals("Car list node", listBuilder.build(), optional.get());
}
- private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path,
- NormalizedNode<?, ?> expNode) throws Exception {
+ private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> expNode) throws Exception {
Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, optional.isPresent());
assertEquals("Data node", expNode, optional.get());
}
- private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
+ private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
+ throws Exception {
Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
assertEquals("exists", true, exists);
}
initDatastoresWithCars(testName);
String followerCarShardName = "member-2-shard-cars-" + testName;
- InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
+ InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
- try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+ try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
.setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
}
initDatastoresWithCars(testName);
String followerCarShardName = "member-2-shard-cars-" + testName;
- InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
+ InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
// Write top-level car container from the follower so it uses a remote Tx.
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
- try (DistributedDataStore ds =
+ try (AbstractDataStore ds =
newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
String testName = "testLeadershipTransferOnShutdown";
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
- IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
- try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+ IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
+ try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
// Create and submit a couple tx's so they're pending.
@Test(expected = AskTimeoutException.class)
public void testTransactionWithShardLeaderNotResponding() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
// Do an initial read to get the primary shard info cached.
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
- try (DistributedDataStore ds =
+ try (AbstractDataStore ds =
follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
followerTestKit.waitForMembersUp("member-1", "member-3");
}
}
- private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+ @Test
+ public void testInstallSnapshot() throws Exception {
+ final String testName = "testInstallSnapshot";
+ final String leaderCarShardName = "member-1-shard-cars-" + testName;
+ final String followerCarShardName = "member-2-shard-cars-" + testName;
+
+ // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
+ // install a snapshot to sync the follower.
+
+ TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+ tree.setSchemaContext(SchemaContextHelper.full());
+
+ ContainerNode carsNode = CarsModel.newCarsNode(
+ CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+ AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
+
+ NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+ Snapshot initialSnapshot = Snapshot.create(
+ new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
+ Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
+ InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
+ InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
+
+ initDatastoresWithCars(testName);
+
+ Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
+ CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, readOptional.isPresent());
+ assertEquals("Node", carsNode, readOptional.get());
+
+ verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
+ initialSnapshot, snapshotRoot);
+
+ verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
+ initialSnapshot, snapshotRoot);
+ }
+
+ private static void verifySnapshot(Snapshot actual, Snapshot expected, NormalizedNode<?, ?> expRoot) {
+ assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+ assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+ assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
+ assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
+ assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
+ MetadataShardDataTreeSnapshot shardSnapshot =
+ (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
+ assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+ }
+
+ private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
Answer<DatastoreContext> answer = invocation -> newBuilder.build();