From 81cc10db365aa8cde38a3d2777488bb83bd69ef5 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 13 Aug 2015 23:21:56 -0400 Subject: [PATCH] Bug 4105: Change commit retry mechanism in EntityOwnershipShard Change-Id: Iba640eab1c21672ffe6357531c6d236e65c1cd73 Signed-off-by: Tom Pantelis --- .../controller/cluster/datastore/Shard.java | 2 +- .../entityownership/EntityOwnershipShard.java | 84 ++------ ...EntityOwnershipShardCommitCoordinator.java | 191 ++++++++++++++++++ .../messages/RegisterCandidateLocal.java | 4 +- .../AbstractEntityOwnershipTest.java | 5 +- .../EntityOwnershipShardTest.java | 82 +++++++- 6 files changed, 289 insertions(+), 79 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 6aee29dd40..5025369907 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -692,7 +692,7 @@ public class Shard extends RaftActor { return commitCoordinator; } - protected DatastoreContext getDatastoreContext() { + public DatastoreContext getDatastoreContext() { return datastoreContext; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 3399e4440a..c95ea62320 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -9,22 +9,17 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; +import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; -import akka.dispatch.OnComplete; -import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -38,9 +33,8 @@ import scala.concurrent.Future; * @author Thomas Pantelis */ class EntityOwnershipShard extends Shard { - private int transactionIDCounter = 0; private final String localMemberName; - private final List retryModifications = new ArrayList<>(); + private final EntityOwnershipShardCommitCoordinator commitCoordinator; private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) { return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build(); @@ -50,6 +44,7 @@ class EntityOwnershipShard extends Shard { DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) { super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext); this.localMemberName = localMemberName; + this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG); } @Override @@ -63,7 +58,7 @@ class EntityOwnershipShard extends Shard { onRegisterCandidateLocal((RegisterCandidateLocal)message); } else if(message instanceof UnregisterCandidateLocal) { onUnregisterCandidateLocal((UnregisterCandidateLocal)message); - } else { + } else if(!commitCoordinator.handleMessage(message, this)) { super.onReceiveCommand(message); } } @@ -73,37 +68,22 @@ class EntityOwnershipShard extends Shard { // TODO - add the listener locally. - BatchedModifications modifications = new BatchedModifications( - TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), - DataStoreVersions.CURRENT_VERSION, ""); - modifications.setDoCommitOnReady(true); - modifications.setReady(true); - modifications.setTotalMessagesSent(1); - NormalizedNode entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(), registerCandidate.getEntity().getId(), localMemberName); - modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners)); - - tryCommitModifications(modifications); + commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this); getSender().tell(SuccessReply.INSTANCE, getSelf()); } - private void tryCommitModifications(final BatchedModifications modifications) { + void tryCommitModifications(final BatchedModifications modifications) { if(isLeader()) { - if(isIsolatedLeader()) { - LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID()); - - retryModifications.add(modifications); - } else { - LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID()); - - // Note that it's possible the commit won't get consensus and will timeout and not be applied - // to the state. However we don't need to retry it in that case b/c it will be committed to - // the journal first and, once a majority of followers come back on line and it is replicated, - // it will be applied at that point. - handleBatchedModificationsLocal(modifications, self()); - } + LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID()); + + // Note that it's possible the commit won't get consensus and will timeout and not be applied + // to the state. However we don't need to retry it in that case b/c it will be committed to + // the journal first and, once a majority of followers come back on line and it is replicated, + // it will be applied at that point. + handleBatchedModificationsLocal(modifications, self()); } else { final ActorSelection leader = getLeader(); if (leader != null) { @@ -111,45 +91,21 @@ class EntityOwnershipShard extends Shard { Future future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis( getDatastoreContext().getShardTransactionCommitTimeoutInSeconds())); - future.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, Object response) { - if(failure != null) { - if(failure instanceof AskTimeoutException) { - LOG.debug("BatchedModifications {} to leader {} timed out - retrying", - modifications.getTransactionID(), leader); - tryCommitModifications(modifications); - } else { - LOG.error("BatchedModifications {} to leader {} failed", - modifications.getTransactionID(), leader, failure); - } - } else { - LOG.debug("BatchedModifications {} to leader {} succeeded", - modifications.getTransactionID(), leader); - } - } - }, getContext().dispatcher()); - } else { - LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID()); - - retryModifications.add(modifications); + + Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender()); } } } + boolean hasLeader() { + return getLeader() != null && !isIsolatedLeader(); + } + @Override protected void onStateChanged() { super.onStateChanged(); - if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) { - LOG.debug("# BatchedModifications to retry {}", retryModifications.size()); - - List retryModificationsCopy = new ArrayList<>(retryModifications); - retryModifications.clear(); - for(BatchedModifications mods: retryModificationsCopy) { - tryCommitModifications(mods); - } - } + commitCoordinator.onStateChanged(this, isLeader()); } private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java new file mode 100644 index 0000000000..6c15ef6ed0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Status.Failure; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.slf4j.Logger; +import scala.concurrent.duration.FiniteDuration; + +/** + * Handles commits and retries for the EntityOwnershipShard. + * + * @author Thomas Pantelis + */ +class EntityOwnershipShardCommitCoordinator { + private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry"; + + private final Logger log; + private int transactionIDCounter = 0; + private final String localMemberName; + private final Queue pendingModifications = new LinkedList<>(); + private BatchedModifications inflightCommit; + private Cancellable retryCommitSchedule; + + EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) { + this.localMemberName = localMemberName; + this.log = log; + } + + boolean handleMessage(Object message, EntityOwnershipShard shard) { + boolean handled = true; + if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) { + // Successful reply from a local commit. + inflightCommitSucceeded(shard); + } else if(message instanceof akka.actor.Status.Failure) { + // Failure reply from a local commit. + inflightCommitFailure(((Failure)message).cause(), shard); + } else if(message.equals(COMMIT_RETRY_MESSAGE)) { + retryInflightCommit(shard); + } else { + handled = false; + } + + return handled; + } + + private void retryInflightCommit(EntityOwnershipShard shard) { + // Shouldn't be null happen but verify anyway + if(inflightCommit == null) { + return; + } + + if(shard.hasLeader()) { + log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID()); + + shard.tryCommitModifications(inflightCommit); + } else { + scheduleInflightCommitRetry(shard); + } + } + + void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) { + // This should've originated from a failed inflight commit but verify anyway + if(inflightCommit == null) { + return; + } + + log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause); + + if(!(cause instanceof NoShardLeaderException)) { + // If the failure is other than NoShardLeaderException the commit may have been partially + // processed so retry with a new transaction ID to be safe. + newInflightCommitWithDifferentTransactionID(); + } + + scheduleInflightCommitRetry(shard); + } + + private void scheduleInflightCommitRetry(EntityOwnershipShard shard) { + FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval(); + + log.debug("Scheduling retry for BatchedModifications commit {} in {}", + inflightCommit.getTransactionID(), duration); + + retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(), + COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender()); + } + + void inflightCommitSucceeded(EntityOwnershipShard shard) { + // Shouldn't be null but verify anyway + if(inflightCommit == null) { + return; + } + + if(retryCommitSchedule != null) { + retryCommitSchedule.cancel(); + } + + log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID()); + + inflightCommit = null; + commitNextBatch(shard); + } + + void commitNextBatch(EntityOwnershipShard shard) { + if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) { + return; + } + + inflightCommit = newBatchedModifications(); + Iterator iter = pendingModifications.iterator(); + while(iter.hasNext()) { + inflightCommit.addModification(iter.next()); + iter.remove(); + if(inflightCommit.getModifications().size() >= + shard.getDatastoreContext().getShardBatchedModificationCount()) { + break; + } + } + + log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(), + inflightCommit.getModifications().size()); + + shard.tryCommitModifications(inflightCommit); + } + + void commitModification(Modification modification, EntityOwnershipShard shard) { + boolean hasLeader = shard.hasLeader(); + if(inflightCommit != null || !hasLeader) { + if(log.isDebugEnabled()) { + log.debug("{} - adding modification to pending", + (inflightCommit != null ? "A commit is inflight" : "No shard leader")); + } + + pendingModifications.add(modification); + } else { + inflightCommit = newBatchedModifications(); + inflightCommit.addModification(modification); + + shard.tryCommitModifications(inflightCommit); + } + } + + void onStateChanged(EntityOwnershipShard shard, boolean isLeader) { + if(!isLeader && inflightCommit != null) { + // We're no longer the leader but we have an inflight local commit. This likely means we didn't get + // consensus for the commit and switched to follower due to another node with a higher term. We + // can't be sure if the commit was replicated to any node so we retry it here with a new + // transaction ID. + if(retryCommitSchedule != null) { + retryCommitSchedule.cancel(); + } + + newInflightCommitWithDifferentTransactionID(); + retryInflightCommit(shard); + } else { + commitNextBatch(shard); + } + } + + private void newInflightCommitWithDifferentTransactionID() { + BatchedModifications newBatchedModifications = newBatchedModifications(); + newBatchedModifications.getModifications().addAll(inflightCommit.getModifications()); + inflightCommit = newBatchedModifications; + } + + private BatchedModifications newBatchedModifications() { + BatchedModifications modifications = new BatchedModifications( + TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), + DataStoreVersions.CURRENT_VERSION, ""); + modifications.setDoCommitOnReady(true); + modifications.setReady(true); + modifications.setTotalMessagesSent(1); + return modifications; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java index 6ba09a0729..72c95e0c23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java @@ -35,8 +35,8 @@ public class RegisterCandidateLocal { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=") - .append(entity).append("]"); + builder.append("RegisterCandidateLocal [entity=").append(entity).append(", candidate=").append(candidate) + .append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java index 3d31a06349..c6ef27545a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME; @@ -65,7 +66,9 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest { MapNode entityTypeMapNode = (MapNode) childNode.get(); Optional entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates( childMap, child, key)); - assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent()); + if(!entityTypeEntry.isPresent()) { + fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue()); + } return entityTypeEntry.get(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index e4aaaa1888..d4c59cc396 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -18,7 +18,9 @@ import akka.testkit.TestActorRef; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -200,7 +203,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100); + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100). + shardBatchedModificationCount(5); String peerId = actorFactory.generateActorId("leader"); TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). @@ -221,9 +225,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { MockLeader leader = peer.underlyingActor(); assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, + LOCAL_MEMBER_NAME); + + shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender()); - leader.modificationsReceived = new CountDownLatch(2); + // Test with initial commit timeout and subsequent retry. + + leader.modificationsReceived = new CountDownLatch(1); leader.sendReply = false; shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); @@ -233,7 +242,34 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( leader.modificationsReceived, 5, TimeUnit.SECONDS)); - verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME); + verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2, + LOCAL_MEMBER_NAME); + + // Send a bunch of registration messages quickly and verify. + + int max = 100; + leader.delay = 4; + leader.modificationsReceived = new CountDownLatch(max); + List entityIds = new ArrayList<>(); + for(int i = 1; i <= max; i++) { + YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i)); + entityIds.add(id); + shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef()); + } + + assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( + leader.modificationsReceived, 10, TimeUnit.SECONDS)); + + // Sleep a little to ensure no additional BatchedModifications are received. + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + List receivedMods = leader.getAndClearReceivedModifications(); + for(int i = 0; i < max; i++) { + verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME); + } + + assertEquals("# modifications received", max, receivedMods.size()); } private void verifyCommittedEntityCandidate(TestActorRef shard, String entityType, @@ -241,11 +277,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName); } - private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType, + private void verifyBatchedEntityCandidate(List mods, String entityType, YangInstanceIdentifier entityId, String candidateName) throws Exception { - assertEquals("BatchedModifications size", 1, mods.getModifications().size()); - assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass()); - verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType, + assertEquals("BatchedModifications size", 1, mods.size()); + verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName); + } + + private void verifyBatchedEntityCandidate(Modification mod, String entityType, + YangInstanceIdentifier entityId, String candidateName) throws Exception { + assertEquals("Modification type", MergeModification.class, mod.getClass()); + verifyEntityCandidate(((MergeModification)mod).getData(), entityType, entityId, candidateName); } @@ -306,20 +347,39 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public static class MockLeader extends UntypedActor { volatile CountDownLatch modificationsReceived = new CountDownLatch(1); - volatile BatchedModifications receivedModifications; + List receivedModifications = new ArrayList<>(); volatile boolean sendReply = true; + volatile long delay; @Override public void onReceive(Object message) { if(message instanceof BatchedModifications) { - receivedModifications = (BatchedModifications) message; - modificationsReceived.countDown(); + if(delay > 0) { + Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); + } + if(sendReply) { + BatchedModifications mods = (BatchedModifications) message; + synchronized (receivedModifications) { + for(int i = 0; i < mods.getModifications().size(); i++) { + receivedModifications.add(mods.getModifications().get(i)); + modificationsReceived.countDown(); + } + } + getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { sendReply = true; } } } + + List getAndClearReceivedModifications() { + synchronized (receivedModifications) { + List ret = new ArrayList<>(receivedModifications); + receivedModifications.clear(); + return ret; + } + } } } -- 2.36.6