boolean handleMessage(Object message, EntityOwnershipShard shard) {
boolean handled = true;
- if(CommitTransactionReply.isSerializedType(message)) {
+ if (CommitTransactionReply.isSerializedType(message)) {
// Successful reply from a local commit.
inflightCommitSucceeded(shard);
- } else if(message instanceof akka.actor.Status.Failure) {
+ } else if (message instanceof akka.actor.Status.Failure) {
// Failure reply from a local commit.
- inflightCommitFailure(((Failure)message).cause(), shard);
- } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
+ inflightCommitFailure(((Failure) message).cause(), shard);
+ } else if (COMMIT_RETRY_MESSAGE.equals(message)) {
retryInflightCommit(shard);
} else {
handled = false;
private void retryInflightCommit(EntityOwnershipShard shard) {
// Shouldn't be null happen but verify anyway
- if(inflightCommit == null) {
+ if (inflightCommit == null) {
return;
}
- if(shard.hasLeader()) {
+ if (shard.hasLeader()) {
log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
shard.tryCommitModifications(inflightCommit);
void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
// This should've originated from a failed inflight commit but verify anyway
- if(inflightCommit == null) {
+ if (inflightCommit == null) {
return;
}
log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
- if(!(cause instanceof NoShardLeaderException)) {
+ if (!(cause instanceof NoShardLeaderException)) {
// If the failure is other than NoShardLeaderException the commit may have been partially
// processed so retry with a new transaction ID to be safe.
newInflightCommitWithDifferentTransactionID();
void inflightCommitSucceeded(EntityOwnershipShard shard) {
// Shouldn't be null but verify anyway
- if(inflightCommit == null) {
+ if (inflightCommit == null) {
return;
}
- if(retryCommitSchedule != null) {
+ if (retryCommitSchedule != null) {
retryCommitSchedule.cancel();
}
}
void commitNextBatch(EntityOwnershipShard shard) {
- if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
+ if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
return;
}
inflightCommit = newBatchedModifications();
Iterator<Modification> iter = pendingModifications.iterator();
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
inflightCommit.addModification(iter.next());
iter.remove();
- if(inflightCommit.getModifications().size() >=
- shard.getDatastoreContext().getShardBatchedModificationCount()) {
+ if (inflightCommit.getModifications().size()
+ >= shard.getDatastoreContext().getShardBatchedModificationCount()) {
break;
}
}
}
void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
- if(modifications.getModifications().isEmpty()) {
+ if (modifications.getModifications().isEmpty()) {
return;
}
boolean hasLeader = shard.hasLeader();
- if(inflightCommit != null || !hasLeader) {
- if(log.isDebugEnabled()) {
+ if (inflightCommit != null || !hasLeader) {
+ if (log.isDebugEnabled()) {
log.debug("{} - adding modifications to pending",
inflightCommit != null ? "A commit is inflight" : "No shard leader");
}
possiblyPrunePendingCommits(shard, isLeader);
- if(!isLeader && inflightCommit != null) {
+ if (!isLeader && inflightCommit != null) {
// We're no longer the leader but we have an inflight local commit. This likely means we didn't get
// consensus for the commit and switched to follower due to another node with a higher term. We
// can't be sure if the commit was replicated to any node so we retry it here with a new
// transaction ID.
- if(retryCommitSchedule != null) {
+ if (retryCommitSchedule != null) {
retryCommitSchedule.cancel();
}
shard.convertPendingTransactionsToMessages();
// Prune the inflightCommit.
- if(inflightCommit != null) {
+ if (inflightCommit != null) {
inflightCommit = pruneModifications(inflightCommit);
}
// Prune the subsequent pending modifications.
Iterator<Modification> iter = pendingModifications.iterator();
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
Modification mod = iter.next();
- if(!canForwardModificationToNewLeader(mod)) {
+ if (!canForwardModificationToNewLeader(mod)) {
iter.remove();
}
}
@Nullable
private BatchedModifications pruneModifications(BatchedModifications toPrune) {
- BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(), toPrune.getVersion());
+ BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(),
+ toPrune.getVersion());
prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
prunedModifications.setReady(toPrune.isReady());
prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
- for(Modification mod: toPrune.getModifications()) {
- if(canForwardModificationToNewLeader(mod)) {
+ for (Modification mod: toPrune.getModifications()) {
+ if (canForwardModificationToNewLeader(mod)) {
prunedModifications.addModification(mod);
}
}