X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=97e6c62037b2801e1ea1e6521cc8b77597be3335;hb=4d1709660b7af992d4c382a2a38debb5c7d64fb9;hp=6c15ef6ed05cb2fb41b7b837e3655b0fef167beb;hpb=81cc10db365aa8cde38a3d2777488bb83bd69ef5;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 6c15ef6ed0..97e6c62037 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -10,12 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; +import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -28,29 +34,37 @@ import scala.concurrent.duration.FiniteDuration; * @author Thomas Pantelis */ class EntityOwnershipShardCommitCoordinator { - private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry"; + private static final Object COMMIT_RETRY_MESSAGE = new Object() { + @Override + public String toString() { + return "entityCommitRetry"; + } + }; + private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal"); - private final Logger log; - private int transactionIDCounter = 0; - private final String localMemberName; private final Queue pendingModifications = new LinkedList<>(); + private final LocalHistoryIdentifier historyId; + private final Logger log; + private BatchedModifications inflightCommit; private Cancellable retryCommitSchedule; + private long transactionIDCounter = 0; - EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) { - this.localMemberName = localMemberName; - this.log = log; + EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) { + this.log = Preconditions.checkNotNull(log); + historyId = new LocalHistoryIdentifier( + ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0); } boolean handleMessage(Object message, EntityOwnershipShard shard) { boolean handled = true; - if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) { + if(CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. inflightCommitSucceeded(shard); } else if(message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. inflightCommitFailure(((Failure)message).cause(), shard); - } else if(message.equals(COMMIT_RETRY_MESSAGE)) { + } else if(COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; @@ -140,18 +154,26 @@ class EntityOwnershipShardCommitCoordinator { } void commitModification(Modification modification, EntityOwnershipShard shard) { + BatchedModifications modifications = newBatchedModifications(); + modifications.addModification(modification); + commitModifications(modifications, shard); + } + + void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) { + if(modifications.getModifications().isEmpty()) { + return; + } + boolean hasLeader = shard.hasLeader(); if(inflightCommit != null || !hasLeader) { if(log.isDebugEnabled()) { - log.debug("{} - adding modification to pending", + log.debug("{} - adding modifications to pending", (inflightCommit != null ? "A commit is inflight" : "No shard leader")); } - pendingModifications.add(modification); + pendingModifications.addAll(modifications.getModifications()); } else { - inflightCommit = newBatchedModifications(); - inflightCommit.addModification(modification); - + inflightCommit = modifications; shard.tryCommitModifications(inflightCommit); } } @@ -179,10 +201,9 @@ class EntityOwnershipShardCommitCoordinator { inflightCommit = newBatchedModifications; } - private BatchedModifications newBatchedModifications() { + BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( - TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), - DataStoreVersions.CURRENT_VERSION, ""); + new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); modifications.setReady(true); modifications.setTotalMessagesSent(1);