+ final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+ ShardStrategyFactory.setConfiguration(configuration);
+
+
+
+ new JavaTestKit(getSystem()) {
+ {
+
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+ try {
+ final DistributedDataStore distributedDataStore =
+ new DistributedDataStore(getSystem(), "config",
+ new MockClusterWrapper(), configuration,
+ new DatastoreContext());
+
+ distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+ .message("Switching from state Candidate to Leader")
+ .occurrences(1).exec();
+
+ assertEquals(true, result);
+
+ DOMStoreReadWriteTransaction transaction =
+ distributedDataStore.newReadWriteTransaction();
+
+ transaction
+ .write(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME));
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>>
+ future =
+ transaction.read(TestModel.TEST_PATH);
+
+ Optional<NormalizedNode<?, ?>> optional =
+ future.get();
+
+ Assert.assertTrue("Node not found", optional.isPresent());
+
+ NormalizedNode<?, ?> normalizedNode =
+ optional.get();
+
+ assertEquals(TestModel.TEST_QNAME,
+ normalizedNode.getNodeType());
+
+ DOMStoreThreePhaseCommitCohort ready =
+ transaction.ready();
+
+ ListenableFuture<Boolean> canCommit =
+ ready.canCommit();
+
+ assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+
+ ListenableFuture<Void> preCommit =
+ ready.preCommit();
+
+ preCommit.get(5, TimeUnit.SECONDS);
+
+ ListenableFuture<Void> commit = ready.commit();
+
+ commit.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException | TimeoutException | InterruptedException e){
+ fail(e.getMessage());
+ }
+ }
+ };
+ }
+ };
+
+ }
+
+ @Test
+ public void transactionChainIntegrationTest() throws Exception {
+ final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+ ShardStrategyFactory.setConfiguration(configuration);
+
+
+
+ new JavaTestKit(getSystem()) {
+ {
+
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+ try {
+ final DistributedDataStore distributedDataStore =
+ new DistributedDataStore(getSystem(), "config",
+ new MockClusterWrapper(), configuration,
+ new DatastoreContext());
+
+ distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+ .message("Switching from state Candidate to Leader")
+ .occurrences(1).exec();
+
+ assertEquals(true, result);
+
+ DOMStoreTransactionChain transactionChain =
+ distributedDataStore.createTransactionChain();
+
+ DOMStoreReadWriteTransaction transaction =
+ transactionChain.newReadWriteTransaction();
+
+ transaction
+ .write(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME));
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>>
+ future =
+ transaction.read(TestModel.TEST_PATH);
+
+ Optional<NormalizedNode<?, ?>> optional =
+ future.get();
+
+ Assert.assertTrue("Node not found", optional.isPresent());
+
+ NormalizedNode<?, ?> normalizedNode =
+ optional.get();
+
+ assertEquals(TestModel.TEST_QNAME,
+ normalizedNode.getNodeType());
+
+ DOMStoreThreePhaseCommitCohort ready =
+ transaction.ready();
+
+ ListenableFuture<Boolean> canCommit =
+ ready.canCommit();
+
+ assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+
+ ListenableFuture<Void> preCommit =
+ ready.preCommit();
+
+ preCommit.get(5, TimeUnit.SECONDS);
+
+ ListenableFuture<Void> commit = ready.commit();
+
+ commit.get(5, TimeUnit.SECONDS);
+
+ transactionChain.close();
+ } catch (ExecutionException | TimeoutException | InterruptedException e){
+ fail(e.getMessage());
+ }
+ }
+ };
+ }
+ };
+
+ }
+
+
+ //FIXME : Disabling test because it's flaky
+ //@Test
+ public void integrationTestWithMultiShardConfiguration()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+
+ ShardStrategyFactory.setConfiguration(configuration);
+
+ new JavaTestKit(getSystem()) {
+ {
+
+ new Within(duration("10 seconds")) {
+ @Override
+ protected void run() {
+ try {
+ final DistributedDataStore distributedDataStore =
+ new DistributedDataStore(getSystem(), "config",
+ new MockClusterWrapper(), configuration, null);
+
+ distributedDataStore.onGlobalContextUpdated(
+ SchemaContextHelper.full());