+ private void onCreateShard(CreateShard createShard) {
+ Object reply;
+ try {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ if(localShards.containsKey(moduleShardConfig.getShardName())) {
+ throw new IllegalStateException(String.format("Shard with name %s already exists",
+ moduleShardConfig.getShardName()));
+ }
+
+ configuration.addModuleShardConfiguration(moduleShardConfig);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+ Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+ moduleShardConfig.getShardMemberNames()*/);
+
+ LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
+
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = datastoreContext;
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+ localShards.put(info.getShardName(), info);
+
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+
+ reply = new CreateShardReply();
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
+
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
+ private void checkReady(){
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+
+ private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+ LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+ shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
+ if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ primaryShardInfoCache.remove(shardInformation.getShardName());
+ }
+
+ checkReady();
+ } else {
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ }
+ }
+
+ private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ ShardInformation shardInfo = message.getShardInfo();
+
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ shardInfo.getShardName());
+
+ shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+ if(!shardInfo.isShardInitialized()) {
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ } else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ }
+ }
+