X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=9cbcbf9dea73c0750ea51603eee701db6ec42cfb;hp=6c15ef6ed05cb2fb41b7b837e3655b0fef167beb;hb=4e3f49788c05730b29468deebc2aaa4ed0d94eef;hpb=81cc10db365aa8cde38a3d2777488bb83bd69ef5 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 6c15ef6ed0..9cbcbf9dea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -13,6 +13,7 @@ import akka.actor.Status.Failure; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; @@ -28,29 +29,34 @@ import scala.concurrent.duration.FiniteDuration; * @author Thomas Pantelis */ class EntityOwnershipShardCommitCoordinator { - private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry"; + private static final Object COMMIT_RETRY_MESSAGE = new Object() { + @Override + public String toString() { + return "entityCommitRetry"; + } + }; private final Logger log; private int transactionIDCounter = 0; - private final String localMemberName; + private final MemberName localMemberName; private final Queue pendingModifications = new LinkedList<>(); private BatchedModifications inflightCommit; private Cancellable retryCommitSchedule; - EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) { + EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) { this.localMemberName = localMemberName; this.log = log; } boolean handleMessage(Object message, EntityOwnershipShard shard) { boolean handled = true; - if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) { + if(CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. inflightCommitSucceeded(shard); } else if(message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. inflightCommitFailure(((Failure)message).cause(), shard); - } else if(message.equals(COMMIT_RETRY_MESSAGE)) { + } else if(COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; @@ -140,18 +146,26 @@ class EntityOwnershipShardCommitCoordinator { } void commitModification(Modification modification, EntityOwnershipShard shard) { + BatchedModifications modifications = newBatchedModifications(); + modifications.addModification(modification); + commitModifications(modifications, shard); + } + + void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { + if(modifications.getModifications().isEmpty()) { + return; + } + boolean hasLeader = shard.hasLeader(); if(inflightCommit != null || !hasLeader) { if(log.isDebugEnabled()) { - log.debug("{} - adding modification to pending", + log.debug("{} - adding modifications to pending", (inflightCommit != null ? "A commit is inflight" : "No shard leader")); } - pendingModifications.add(modification); + pendingModifications.addAll(modifications.getModifications()); } else { - inflightCommit = newBatchedModifications(); - inflightCommit.addModification(modification); - + inflightCommit = modifications; shard.tryCommitModifications(inflightCommit); } } @@ -179,7 +193,7 @@ class EntityOwnershipShardCommitCoordinator { inflightCommit = newBatchedModifications; } - private BatchedModifications newBatchedModifications() { + BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), DataStoreVersions.CURRENT_VERSION, "");