+ private void onCreateShard(CreateShard createShard) {
+ Object reply;
+ try {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ if(localShards.containsKey(moduleShardConfig.getShardName())) {
+ throw new IllegalStateException(String.format("Shard with name %s already exists",
+ moduleShardConfig.getShardName()));
+ }
+
+ configuration.addModuleShardConfiguration(moduleShardConfig);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+ Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+ moduleShardConfig.getShardMemberNames()*/);
+
+ LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
+
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = datastoreContext;
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+ localShards.put(info.getShardName(), info);
+
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+
+ reply = new CreateShardReply();
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
+
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
+ private void checkReady(){
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+
+ private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {