+ LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+
+ // TODO - add the listener locally.
+
+ BatchedModifications modifications = new BatchedModifications(
+ TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ modifications.setDoCommitOnReady(true);
+ modifications.setReady(true);
+ modifications.setTotalMessagesSent(1);
+
+ NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
+ registerCandidate.getEntity().getId(), localMemberName);
+ modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
+
+ tryCommitModifications(modifications);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ private void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ if(isIsolatedLeader()) {
+ LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ } else {
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ }
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+ Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+ getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure != null) {
+ if(failure instanceof AskTimeoutException) {
+ LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
+ modifications.getTransactionID(), leader);
+ tryCommitModifications(modifications);
+ } else {
+ LOG.error("BatchedModifications {} to leader {} failed",
+ modifications.getTransactionID(), leader, failure);
+ }
+ } else {
+ LOG.debug("BatchedModifications {} to leader {} succeeded",
+ modifications.getTransactionID(), leader);
+ }
+ }
+ }, getContext().dispatcher());
+ } else {
+ LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ }
+ }
+ }
+
+ @Override
+ protected void onStateChanged() {
+ super.onStateChanged();
+
+ if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
+ LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
+
+ List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
+ retryModifications.clear();
+ for(BatchedModifications mods: retryModificationsCopy) {
+ tryCommitModifications(mods);
+ }
+ }
+ }
+
+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ // TODO - implement