import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.lang.reflect.Constructor;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
return datastoreContextBuilder;
}
- public AbstractDataStore setupDistributedDataStore(final String typeName, final String... shardNames) {
- return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames);
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader,
+ final SchemaContext schemaContext) throws Exception {
+ return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader, schemaContext);
}
- public AbstractDataStore setupDistributedDataStore(final String typeName, final boolean waitUntilLeader,
- final String... shardNames) {
- return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader,
+ public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+ final String modulesConfig,
+ final boolean waitUntilLeader,
+ final SchemaContext schemaContext,
+ final String... shardNames) throws Exception {
+ return (DistributedDataStore) setupAbstractDataStore(DistributedDataStore.class, typeName, moduleShardsConfig,
+ modulesConfig, waitUntilLeader, schemaContext, shardNames);
+ }
+
+ public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
+ final String typeName, final String... shardNames)
+ throws Exception {
+ return setupAbstractDataStore(implementation, typeName, "module-shards.conf", true,
+ SchemaContextHelper.full(), shardNames);
+ }
+
+ public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
+ final String typeName, final boolean waitUntilLeader,
+ final String... shardNames) throws Exception {
+ return setupAbstractDataStore(implementation, typeName, "module-shards.conf", waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
- final boolean waitUntilLeader, final String... shardNames) {
- return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
+ public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
+ final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader, final String... shardNames)
+ throws Exception {
+ return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, waitUntilLeader,
SchemaContextHelper.full(), shardNames);
}
- public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
- final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
- return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
+ public AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
+ final String typeName, final String moduleShardsConfig,
+ final boolean waitUntilLeader,
+ final SchemaContext schemaContext,
+ final String... shardNames) throws Exception {
+ return setupAbstractDataStore(implementation, typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
schemaContext, shardNames);
}
- public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
- final String modulesConfig, final boolean waitUntilLeader,
- final SchemaContext schemaContext, final String... shardNames) {
+ private AbstractDataStore setupAbstractDataStore(final Class<? extends AbstractDataStore> implementation,
+ final String typeName, final String moduleShardsConfig,
+ final String modulesConfig, final boolean waitUntilLeader,
+ final SchemaContext schemaContext, final String... shardNames)
+ throws Exception {
final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
datastoreContextBuilder.dataStoreName(typeName);
- DatastoreContext datastoreContext = datastoreContextBuilder.build();
- DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+ final DatastoreContext datastoreContext = datastoreContextBuilder.build();
+ final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
- AbstractDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
- restoreFromSnapshot);
+ final Constructor constructor = implementation.getDeclaredConstructor(
+ ActorSystem.class, ClusterWrapper.class, Configuration.class,
+ DatastoreContextFactory.class, DatastoreSnapshot.class);
+
+ final AbstractDataStore dataStore = (AbstractDataStore) constructor.newInstance(
+ getSystem(), cluster, config, mockContextFactory, restoreFromSnapshot);
dataStore.onGlobalContextUpdated(schemaContext);
return shard;
}
- public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) {
+ public static void waitUntilShardIsDown(final ActorContext actorContext, final String shardName) {
for (int i = 0; i < 20 * 5 ; i++) {
LOG.debug("Waiting for shard down {}", shardName);
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);