+ public static DistributedEntityOwnershipService start(final ActorUtils context,
+ final EntityOwnerSelectionStrategyConfig strategyConfig) {
+ ActorRef shardManagerActor = context.getShardManager();
+
+ Configuration configuration = context.getConfiguration();
+ Collection<MemberName> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
+ CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
+ "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
+ newShardBuilder(context, strategyConfig), null);
+
+ Future<Object> createFuture = context.executeOperationAsync(shardManagerActor, createShard, MESSAGE_TIMEOUT);
+ createFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+ } else {
+ LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
+ }
+ }
+ }, context.getClientDispatcher());
+
+ return new DistributedEntityOwnershipService(context);
+ }
+
+ private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) {
+ Future<Object> future = context.executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ // FIXME: CONTROLLER-1904: reduce the severity to info once we have a retry mechanism
+ LOG.error("Error sending message {} to {}", message, shardActor, failure);
+ } else {
+ LOG.debug("{} message to {} succeeded", message, shardActor);
+ }
+ }
+ }, context.getClientDispatcher());
+ }
+
+ @VisibleForTesting
+ void executeLocalEntityOwnershipShardOperation(final Object message) {
+ if (localEntityOwnershipShard == null) {
+ Future<ActorRef> future = context.findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME);
+ future.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef shardActor) {
+ if (failure != null) {
+ // FIXME: CONTROLLER-1904: reduce the severity to info once we have a retry mechanism
+ LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+ } else {
+ localEntityOwnershipShard = shardActor;
+ executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+ }
+ }
+ }, context.getClientDispatcher());
+
+ } else {
+ executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+ }