- private static class ShardNotInitializedTimeout {
- private final ActorRef sender;
- private final ShardInformation shardInfo;
- private final OnShardInitialized onShardInitialized;
-
- ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
- this.sender = sender;
- this.shardInfo = shardInfo;
- this.onShardInitialized = onShardInitialized;
- }
-
- ActorRef getSender() {
- return sender;
- }
-
- ShardInformation getShardInfo() {
- return shardInfo;
- }
-
- OnShardInitialized getOnShardInitialized() {
- return onShardInitialized;
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
- private ClusterWrapper cluster;
- private Configuration configuration;
- private DatastoreContextFactory datastoreContextFactory;
- private CountDownLatch waitTillReadyCountdownLatch;
- private PrimaryShardInfoFutureCache primaryShardInfoCache;
- private DatastoreSnapshot restoreFromSnapshot;
- private volatile boolean sealed;
-
- @SuppressWarnings("unchecked")
- private T self() {
- return (T) this;
- }
-
- protected void checkSealed() {
- Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
- }
-
- public T cluster(ClusterWrapper cluster) {
- checkSealed();
- this.cluster = cluster;
- return self();
- }
-
- public T configuration(Configuration configuration) {
- checkSealed();
- this.configuration = configuration;
- return self();
- }
-
- public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
- checkSealed();
- this.datastoreContextFactory = datastoreContextFactory;
- return self();
- }
-
- public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
- checkSealed();
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- return self();
- }
-
- public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
- checkSealed();
- this.primaryShardInfoCache = primaryShardInfoCache;
- return self();
- }
-
- public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
- checkSealed();
- this.restoreFromSnapshot = restoreFromSnapshot;
- return self();
- }
-
- protected void verify() {
- sealed = true;
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
- }
-
- public Props props() {
- verify();
- return Props.create(ShardManager.class, this);
- }
- }
-
- public static class Builder extends AbstractBuilder<Builder> {
- }
-
- private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
-
-
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- handler.onFailure(failure);
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
- } else if(response instanceof LocalPrimaryShardFound) {
- handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
- } else {
- handler.onUnknownResponse(response);
- }
- }
- }
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));