X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=a6958297443e9cb7e635f300a96c1b3346e715e6;hb=4dc70853503713e7ed729815e2ce7bfd750b2bd3;hp=c792cf1dda996370b52002fa952b5689088d73df;hpb=07c96b0fa318b7bf559df4954f705d06a44f1354;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index c792cf1dda..a695829744 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -62,13 +62,13 @@ class EntityOwnershipShardCommitCoordinator { boolean handleMessage(Object message, EntityOwnershipShard shard) { boolean handled = true; - if(CommitTransactionReply.isSerializedType(message)) { + if (CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. inflightCommitSucceeded(shard); - } else if(message instanceof akka.actor.Status.Failure) { + } else if (message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. - inflightCommitFailure(((Failure)message).cause(), shard); - } else if(COMMIT_RETRY_MESSAGE.equals(message)) { + inflightCommitFailure(((Failure) message).cause(), shard); + } else if (COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; @@ -79,12 +79,12 @@ class EntityOwnershipShardCommitCoordinator { private void retryInflightCommit(EntityOwnershipShard shard) { // Shouldn't be null happen but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - if(shard.hasLeader()) { - log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID()); + if (shard.hasLeader()) { + log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId()); shard.tryCommitModifications(inflightCommit); } else { @@ -94,13 +94,13 @@ class EntityOwnershipShardCommitCoordinator { void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) { // This should've originated from a failed inflight commit but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause); + log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause); - if(!(cause instanceof NoShardLeaderException)) { + if (!(cause instanceof NoShardLeaderException)) { // If the failure is other than NoShardLeaderException the commit may have been partially // processed so retry with a new transaction ID to be safe. newInflightCommitWithDifferentTransactionID(); @@ -113,7 +113,7 @@ class EntityOwnershipShardCommitCoordinator { FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval(); log.debug("Scheduling retry for BatchedModifications commit {} in {}", - inflightCommit.getTransactionID(), duration); + inflightCommit.getTransactionId(), duration); retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(), COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender()); @@ -121,37 +121,37 @@ class EntityOwnershipShardCommitCoordinator { void inflightCommitSucceeded(EntityOwnershipShard shard) { // Shouldn't be null but verify anyway - if(inflightCommit == null) { + if (inflightCommit == null) { return; } - if(retryCommitSchedule != null) { + if (retryCommitSchedule != null) { retryCommitSchedule.cancel(); } - log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID()); + log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId()); inflightCommit = null; commitNextBatch(shard); } void commitNextBatch(EntityOwnershipShard shard) { - if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { + if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { return; } inflightCommit = newBatchedModifications(); Iterator iter = pendingModifications.iterator(); - while(iter.hasNext()) { + while (iter.hasNext()) { inflightCommit.addModification(iter.next()); iter.remove(); - if(inflightCommit.getModifications().size() >= - shard.getDatastoreContext().getShardBatchedModificationCount()) { + if (inflightCommit.getModifications().size() + >= shard.getDatastoreContext().getShardBatchedModificationCount()) { break; } } - log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(), + log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(), inflightCommit.getModifications().size()); shard.tryCommitModifications(inflightCommit); @@ -164,13 +164,13 @@ class EntityOwnershipShardCommitCoordinator { } void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { - if(modifications.getModifications().isEmpty()) { + if (modifications.getModifications().isEmpty()) { return; } boolean hasLeader = shard.hasLeader(); - if(inflightCommit != null || !hasLeader) { - if(log.isDebugEnabled()) { + if (inflightCommit != null || !hasLeader) { + if (log.isDebugEnabled()) { log.debug("{} - adding modifications to pending", inflightCommit != null ? "A commit is inflight" : "No shard leader"); } @@ -187,12 +187,12 @@ class EntityOwnershipShardCommitCoordinator { possiblyPrunePendingCommits(shard, isLeader); - if(!isLeader && inflightCommit != null) { + if (!isLeader && inflightCommit != null) { // We're no longer the leader but we have an inflight local commit. This likely means we didn't get // consensus for the commit and switched to follower due to another node with a higher term. We // can't be sure if the commit was replicated to any node so we retry it here with a new // transaction ID. - if(retryCommitSchedule != null) { + if (retryCommitSchedule != null) { retryCommitSchedule.cancel(); } @@ -216,15 +216,15 @@ class EntityOwnershipShardCommitCoordinator { shard.convertPendingTransactionsToMessages(); // Prune the inflightCommit. - if(inflightCommit != null) { + if (inflightCommit != null) { inflightCommit = pruneModifications(inflightCommit); } // Prune the subsequent pending modifications. Iterator iter = pendingModifications.iterator(); - while(iter.hasNext()) { + while (iter.hasNext()) { Modification mod = iter.next(); - if(!canForwardModificationToNewLeader(mod)) { + if (!canForwardModificationToNewLeader(mod)) { iter.remove(); } } @@ -233,12 +233,13 @@ class EntityOwnershipShardCommitCoordinator { @Nullable private BatchedModifications pruneModifications(BatchedModifications toPrune) { - BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionID(), toPrune.getVersion()); + BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(), + toPrune.getVersion()); prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady()); prunedModifications.setReady(toPrune.isReady()); prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent()); - for(Modification mod: toPrune.getModifications()) { - if(canForwardModificationToNewLeader(mod)) { + for (Modification mod: toPrune.getModifications()) { + if (canForwardModificationToNewLeader(mod)) { prunedModifications.addModification(mod); } } @@ -265,7 +266,7 @@ class EntityOwnershipShardCommitCoordinator { private void newInflightCommitWithDifferentTransactionID() { BatchedModifications newBatchedModifications = newBatchedModifications(); - newBatchedModifications.getModifications().addAll(inflightCommit.getModifications()); + newBatchedModifications.addModifications(inflightCommit.getModifications()); inflightCommit = newBatchedModifications; }