+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof RegisterCandidateLocal) {
+ onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ } else if(message instanceof UnregisterCandidateLocal) {
+ onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+
+ private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
+ LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+
+ // TODO - add the listener locally.
+
+ BatchedModifications modifications = new BatchedModifications(
+ TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ modifications.setDoCommitOnReady(true);
+ modifications.setReady(true);
+ modifications.setTotalMessagesSent(1);
+
+ NormalizedNode<?, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
+ modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
+
+ tryCommitModifications(modifications);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ private NormalizedNode<?, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
+ MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
+ ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
+
+ MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
+ addChild(candidateNode).build();
+
+ MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
+ addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
+
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
+ addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
+ }
+
+ private void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ if(isIsolatedLeader()) {
+ LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ } else {
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ }
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+ Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+ getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure != null) {
+ if(failure instanceof AskTimeoutException) {
+ LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
+ modifications.getTransactionID(), leader);
+ tryCommitModifications(modifications);
+ } else {
+ LOG.error("BatchedModifications {} to leader {} failed",
+ modifications.getTransactionID(), leader, failure);
+ }
+ } else {
+ LOG.debug("BatchedModifications {} to leader {} succeeded",
+ modifications.getTransactionID(), leader);
+ }
+ }
+ }, getContext().dispatcher());
+ } else {
+ LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ }
+ }
+ }
+
+ @Override
+ protected void onStateChanged() {
+ super.onStateChanged();
+
+ if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
+ LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
+
+ List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
+ retryModifications.clear();
+ for(BatchedModifications mods: retryModificationsCopy) {
+ tryCommitModifications(mods);
+ }
+ }
+ }
+
+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ // TODO - implement
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+