-public class ShardManager extends AbstractUntypedActor {
-
- // Stores a mapping between a shard name and the address of the current primary
- private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
-
- // Stores a mapping between a member name and the address of the member
- private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
-
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- private final ActorPath defaultShardPath;
-
- /**
- *
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
- */
- private ShardManager(String type){
- ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type);
- defaultShardPath = actor.path();
- }
-
- public static Props props(final String type){
- return Props.create(new Creator<ShardManager>(){
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type);
- }
- });
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if (message instanceof FindPrimary) {
- FindPrimary msg = ((FindPrimary) message);
- String shardName = msg.getShardName();
- if(Shard.DEFAULT_NAME.equals(shardName)){
- getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
- }
- } else if(message instanceof UpdateSchemaContext){
- // FIXME : Notify all local shards of a context change
- getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
- }
- }
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ // Stores a mapping between a member name and the address of the member
+ // Member names look like "member-1", "member-2" etc and are as specified
+ // in configuration
+ private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+ // Stores a mapping between a shard name and it's corresponding information
+ // Shard names look like inventory, topology etc and are as specified in
+ // configuration
+ private final Map<String, ShardInformation> localShards = new HashMap<>();
+
+ // The type of a ShardManager reflects the type of the datastore itself
+ // A data store could be of type config/operational
+ private final String type;
+
+ private final ClusterWrapper cluster;
+
+ private final Configuration configuration;
+
+ private final String shardDispatcherPath;
+
+ private ShardManagerInfoMBean mBean;
+
+ private final DatastoreContext datastoreContext;
+
+ private Collection<String> knownModules = Collections.emptySet();
+
+ private final DataPersistenceProvider dataPersistenceProvider;
+
+ /**
+ */
+ protected ShardManager(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext) {
+
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.datastoreContext = datastoreContext;
+ this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+ this.type = datastoreContext.getDataStoreType();
+ this.shardDispatcherPath =
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ createLocalShards();
+ }
+
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
+ }
+
+ public static Props props(
+ final ClusterWrapper cluster,
+ final Configuration configuration,
+ final DatastoreContext datastoreContext) {
+
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
+ }
+
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(FindPrimary.fromSerializable(message));
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ unknownMessage(message);
+ }
+
+ }
+
+ private void onActorInitialized(Object message) {
+ final ActorRef sender = getSender();
+
+ if (sender == null) {
+ return; //why is a non-actor sending this message? Just ignore.
+ }
+
+ String actorName = sender.path().name();
+ //find shard name from actor name; actor name is stringified shardId
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+ if (shardId.getShardName() == null) {
+ return;
+ }
+ markShardAsInitialized(shardId.getShardName());
+ }
+
+ private void markShardAsInitialized(String shardName) {
+ LOG.debug("Initializing shard [{}]", shardName);
+ ShardInformation shardInformation = localShards.get(shardName);
+ if (shardInformation != null) {
+ shardInformation.setActorInitialized();
+ }
+ }
+
+ @Override
+ protected void handleRecover(Object message) throws Exception {
+ if(dataPersistenceProvider.isRecoveryApplicable()) {
+ if (message instanceof SchemaContextModules) {
+ SchemaContextModules msg = (SchemaContextModules) message;
+ knownModules = ImmutableSet.copyOf(msg.getModules());
+ } else if (message instanceof RecoveryFailure) {
+ RecoveryFailure failure = (RecoveryFailure) message;
+ LOG.error("Recovery failed", failure.cause());
+ } else if (message instanceof RecoveryCompleted) {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal except the last one
+ deleteMessages(lastSequenceNr() - 1);
+ }
+ } else {
+ if (message instanceof RecoveryCompleted) {
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal
+ deleteMessages(lastSequenceNr());
+ }
+ }
+ }
+
+ private void findLocalShard(FindLocalShard message) {
+ final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+ if(shardInformation == null){
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
+ }
+
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ @Override
+ public Object get() {
+ return new LocalShardFound(shardInformation.getActor());
+ }
+ });
+ }
+
+ private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+ final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized()) {
+ if(waitUntilInitialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+ shardInformation.addRunnableOnInitialized(new Runnable() {
+ @Override
+ public void run() {
+ sender.tell(messageSupplier.get(), self);
+ }
+ });
+ } else {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ }
+
+ return;
+ }
+
+ getSender().tell(messageSupplier.get(), getSelf());
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName, message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
+ private void updateSchemaContext(final Object message) {
+ final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+ Set<String> newModules = new HashSet<>(128);
+
+ for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+ String s = moduleIdentifier.getNamespace().toString();
+ newModules.add(s);
+ }
+
+ if(newModules.containsAll(knownModules)) {
+
+ LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
+
+ knownModules = ImmutableSet.copyOf(newModules);
+
+ dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+ @Override
+ public void apply(SchemaContextModules param) throws Exception {
+ LOG.debug("Sending new SchemaContext to Shards");
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() == null) {
+ info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+ } else {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }