X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShardCommitCoordinator.java;h=97e6c62037b2801e1ea1e6521cc8b77597be3335;hb=70b924258e6c87219c38bcfefb3781ee3d2e2d02;hp=5118de46c119a22bafa98258bd9afc8a052437fc;hpb=5a4560d475f0ed328275f1a5c7a5dae292acfb02;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java index 5118de46c1..97e6c62037 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -10,12 +10,18 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status.Failure; +import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -28,18 +34,26 @@ import scala.concurrent.duration.FiniteDuration; * @author Thomas Pantelis */ class EntityOwnershipShardCommitCoordinator { - private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry"; + private static final Object COMMIT_RETRY_MESSAGE = new Object() { + @Override + public String toString() { + return "entityCommitRetry"; + } + }; + private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal"); - private final Logger log; - private int transactionIDCounter = 0; - private final String localMemberName; private final Queue pendingModifications = new LinkedList<>(); + private final LocalHistoryIdentifier historyId; + private final Logger log; + private BatchedModifications inflightCommit; private Cancellable retryCommitSchedule; + private long transactionIDCounter = 0; - EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) { - this.localMemberName = localMemberName; - this.log = log; + EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) { + this.log = Preconditions.checkNotNull(log); + historyId = new LocalHistoryIdentifier( + ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0); } boolean handleMessage(Object message, EntityOwnershipShard shard) { @@ -50,7 +64,7 @@ class EntityOwnershipShardCommitCoordinator { } else if(message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. inflightCommitFailure(((Failure)message).cause(), shard); - } else if(message.equals(COMMIT_RETRY_MESSAGE)) { + } else if(COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; @@ -189,8 +203,7 @@ class EntityOwnershipShardCommitCoordinator { BatchedModifications newBatchedModifications() { BatchedModifications modifications = new BatchedModifications( - TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), - DataStoreVersions.CURRENT_VERSION, ""); + new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION); modifications.setDoCommitOnReady(true); modifications.setReady(true); modifications.setTotalMessagesSent(1);