+
+ }};
+
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+ throws Exception {
+ final CountDownLatch persistLatch = new CountDownLatch(1);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ @Override
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+ = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+ return dataPersistenceProviderMonitor;
+ }
+ };
+ }
+ };
+
+ new JavaTestKit(getSystem()) {{
+
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("Persisted", true,
+ Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
+
+ }};
+ }
+
+
+
+ private static class TestShardManager extends ShardManager {
+ private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+ TestShardManager(String shardMrgIDSuffix) {
+ super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().build());
+ }
+
+ @Override
+ public void handleRecover(Object message) throws Exception {
+ try {
+ super.handleRecover(message);
+ } finally {
+ if(message instanceof RecoveryCompleted) {
+ recoveryComplete.countDown();
+ }
+ }
+ }
+
+ void waitForRecoveryComplete() {
+ assertEquals("Recovery complete", true,
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class TestShardManagerCreator implements Creator<TestShardManager> {
+ String shardMrgIDSuffix;
+
+ TestShardManagerCreator(String shardMrgIDSuffix) {
+ this.shardMrgIDSuffix = shardMrgIDSuffix;
+ }
+
+ @Override
+ public TestShardManager create() throws Exception {
+ return new TestShardManager(shardMrgIDSuffix);
+ }
+
+ }
+
+ private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+ private Creator<ShardManager> delegate;
+
+ public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return delegate.create();
+ }
+ }