- void waitForRecoveryComplete() {
- assertEquals("Recovery complete", true,
- Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
- }
-
- public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
- return new Builder(datastoreContextBuilder);
- }
-
- private static class Builder extends ShardManager.Builder {
- Builder(DatastoreContext.Builder datastoreContextBuilder) {
- cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
- datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
- waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
- }
-
- @Override
- public Props props() {
- verify();
- return Props.create(TestShardManager.class, this);
- }
- }
-
- @Override
- public void saveSnapshot(Object obj) {
- snapshot = (ShardManagerSnapshot) obj;
- snapshotPersist.countDown();
- }
-
- void verifySnapshotPersisted(Set<String> shardList) {
- assertEquals("saveSnapshot invoked", true,
- Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
- assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
- }
-
- @Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
- if(shardActors.get(info.getShardName()) != null){
- return shardActors.get(info.getShardName());
- }
- return super.newShardActor(schemaContext, info);
- }
-
- public void addShardActor(String shardName, ActorRef actorRef){
- shardActors.put(shardName, actorRef);
- }
- }
-
- private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
- private static final long serialVersionUID = 1L;
- private final Creator<ShardManager> delegate;
-
- public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public ShardManager create() throws Exception {
- return delegate.create();
- }
- }
-
- interface MessageInterceptor extends Function<Object, Object> {
- boolean canIntercept(Object message);
- }
-
- private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
- return new MessageInterceptor(){
- @Override
- public Object apply(Object message) {
- return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
- }
-
- @Override
- public boolean canIntercept(Object message) {
- return message instanceof FindPrimary;
- }
- };
- }
-
- private static class ForwardingShardManager extends ShardManager {
- private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
- private CountDownLatch memberUpReceived = new CountDownLatch(1);
- private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
- private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
- private CountDownLatch memberReachableReceived = new CountDownLatch(1);
- private final ActorRef shardActor;
- private final String name;
- private final CountDownLatch snapshotPersist = new CountDownLatch(1);
- private ShardManagerSnapshot snapshot;
- private volatile MessageInterceptor messageInterceptor;
-
- public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
- super(builder);
- this.shardActor = shardActor;
- this.name = name;
- }
-
- void setMessageInterceptor(MessageInterceptor messageInterceptor) {
- this.messageInterceptor = messageInterceptor;
- }
-
-