+
+ @Override
+ public String persistenceId() {
+ return "shard-manager-" + type;
+ }
+
+ @VisibleForTesting
+ Collection<String> getKnownModules() {
+ return knownModules;
+ }
+
+ @VisibleForTesting
+ DataPersistenceProvider getDataPersistenceProvider() {
+ return dataPersistenceProvider;
+ }
+
+ private class ShardInformation {
+ private final ShardIdentifier shardId;
+ private final String shardName;
+ private ActorRef actor;
+ private ActorPath actorPath;
+ private final Map<ShardIdentifier, String> peerAddresses;
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+
+ private ShardInformation(String shardName, ShardIdentifier shardId,
+ Map<ShardIdentifier, String> peerAddresses) {
+ this.shardName = shardName;
+ this.shardId = shardId;
+ this.peerAddresses = peerAddresses;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ ActorRef getActor(){
+ return actor;
+ }
+
+ ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ void setActor(ActorRef actor) {
+ this.actor = actor;
+ this.actorPath = actor.path();
+ }
+
+ ShardIdentifier getShardId() {
+ return shardId;
+ }
+
+ Map<ShardIdentifier, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+ peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ if(actor != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
+
+ actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+ }
+ }
+ }
+
+ boolean isShardInitialized() {
+ return getActor() != null && actorInitialized;
+ }
+
+ void setActorInitialized() {
+ this.actorInitialized = true;
+
+ for(Runnable runnable: runnablesOnInitialized) {
+ runnable.run();
+ }
+
+ runnablesOnInitialized.clear();
+ }
+
+ void addRunnableOnInitialized(Runnable runnable) {
+ runnablesOnInitialized.add(runnable);
+ }
+ }
+
+ private static class ShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+
+ final ClusterWrapper cluster;
+ final Configuration configuration;
+ final DatastoreContext datastoreContext;
+
+ ShardManagerCreator(ClusterWrapper cluster,
+ Configuration configuration, DatastoreContext datastoreContext) {
+ this.cluster = cluster;
+ this.configuration = configuration;
+ this.datastoreContext = datastoreContext;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(cluster, configuration, datastoreContext);
+ }
+ }
+
+ static class SchemaContextModules implements Serializable {
+ private static final long serialVersionUID = -8884620101025936590L;
+
+ private final Set<String> modules;
+
+ SchemaContextModules(Set<String> modules){
+ this.modules = modules;
+ }
+
+ public Set<String> getModules() {
+ return modules;
+ }
+ }