Bug 4105: Change commit retry mechanism in EntityOwnershipShard 97/26797/1
authorTom Pantelis <tpanteli@brocade.com>
Fri, 14 Aug 2015 03:21:56 +0000 (23:21 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:14:17 +0000 (15:14 -0400)
Change-Id: Iba640eab1c21672ffe6357531c6d236e65c1cd73
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index 6aee29d..5025369 100644 (file)
@@ -692,7 +692,7 @@ public class Shard extends RaftActor {
         return commitCoordinator;
     }
 
-    protected DatastoreContext getDatastoreContext() {
+    public DatastoreContext getDatastoreContext() {
         return datastoreContext;
     }
 
index 3399e44..c95ea62 100644 (file)
@@ -9,22 +9,17 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -38,9 +33,8 @@ import scala.concurrent.Future;
  * @author Thomas Pantelis
  */
 class EntityOwnershipShard extends Shard {
-    private int transactionIDCounter = 0;
     private final String localMemberName;
-    private final List<BatchedModifications> retryModifications = new ArrayList<>();
+    private final EntityOwnershipShardCommitCoordinator commitCoordinator;
 
     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
@@ -50,6 +44,7 @@ class EntityOwnershipShard extends Shard {
             DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
         this.localMemberName = localMemberName;
+        this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
     }
 
     @Override
@@ -63,7 +58,7 @@ class EntityOwnershipShard extends Shard {
             onRegisterCandidateLocal((RegisterCandidateLocal)message);
         } else if(message instanceof UnregisterCandidateLocal) {
             onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
-        } else {
+        } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
     }
@@ -73,37 +68,22 @@ class EntityOwnershipShard extends Shard {
 
         // TODO - add the listener locally.
 
-        BatchedModifications modifications = new BatchedModifications(
-                TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
-                DataStoreVersions.CURRENT_VERSION, "");
-        modifications.setDoCommitOnReady(true);
-        modifications.setReady(true);
-        modifications.setTotalMessagesSent(1);
-
         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
                 registerCandidate.getEntity().getId(), localMemberName);
-        modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
-
-        tryCommitModifications(modifications);
+        commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
 
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
-    private void tryCommitModifications(final BatchedModifications modifications) {
+    void tryCommitModifications(final BatchedModifications modifications) {
         if(isLeader()) {
-            if(isIsolatedLeader()) {
-                LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
-                retryModifications.add(modifications);
-            } else {
-                LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
-
-                // Note that it's possible the commit won't get consensus and will timeout and not be applied
-                // to the state. However we don't need to retry it in that case b/c it will be committed to
-                // the journal first and, once a majority of followers come back on line and it is replicated,
-                // it will be applied at that point.
-                handleBatchedModificationsLocal(modifications, self());
-            }
+            LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+            // Note that it's possible the commit won't get consensus and will timeout and not be applied
+            // to the state. However we don't need to retry it in that case b/c it will be committed to
+            // the journal first and, once a majority of followers come back on line and it is replicated,
+            // it will be applied at that point.
+            handleBatchedModificationsLocal(modifications, self());
         } else {
             final ActorSelection leader = getLeader();
             if (leader != null) {
@@ -111,45 +91,21 @@ class EntityOwnershipShard extends Shard {
 
                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
-                future.onComplete(new OnComplete<Object>() {
-                    @Override
-                    public void onComplete(Throwable failure, Object response) {
-                        if(failure != null) {
-                            if(failure instanceof AskTimeoutException) {
-                                LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
-                                        modifications.getTransactionID(), leader);
-                                tryCommitModifications(modifications);
-                            } else {
-                                LOG.error("BatchedModifications {} to leader {} failed",
-                                        modifications.getTransactionID(), leader, failure);
-                            }
-                        } else {
-                            LOG.debug("BatchedModifications {} to leader {} succeeded",
-                                    modifications.getTransactionID(), leader);
-                        }
-                    }
-                }, getContext().dispatcher());
-            } else {
-                LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
-                retryModifications.add(modifications);
+
+                Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
             }
         }
     }
 
+    boolean hasLeader() {
+        return getLeader() != null && !isIsolatedLeader();
+    }
+
     @Override
     protected void onStateChanged() {
         super.onStateChanged();
 
-        if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
-            LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
-
-            List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
-            retryModifications.clear();
-            for(BatchedModifications mods: retryModificationsCopy) {
-                tryCommitModifications(mods);
-            }
-        }
+        commitCoordinator.onStateChanged(this, isLeader());
     }
 
     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java
new file mode 100644 (file)
index 0000000..6c15ef6
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Status.Failure;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.slf4j.Logger;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Handles commits and retries for the EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipShardCommitCoordinator {
+    private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
+
+    private final Logger log;
+    private int transactionIDCounter = 0;
+    private final String localMemberName;
+    private final Queue<Modification> pendingModifications = new LinkedList<>();
+    private BatchedModifications inflightCommit;
+    private Cancellable retryCommitSchedule;
+
+    EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
+        this.localMemberName = localMemberName;
+        this.log = log;
+    }
+
+    boolean handleMessage(Object message, EntityOwnershipShard shard) {
+        boolean handled = true;
+        if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
+            // Successful reply from a local commit.
+            inflightCommitSucceeded(shard);
+        } else if(message instanceof akka.actor.Status.Failure) {
+            // Failure reply from a local commit.
+            inflightCommitFailure(((Failure)message).cause(), shard);
+        } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
+            retryInflightCommit(shard);
+        } else {
+            handled = false;
+        }
+
+        return handled;
+    }
+
+    private void retryInflightCommit(EntityOwnershipShard shard) {
+        // Shouldn't be null happen but verify anyway
+        if(inflightCommit == null) {
+            return;
+        }
+
+        if(shard.hasLeader()) {
+            log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
+
+            shard.tryCommitModifications(inflightCommit);
+        } else {
+            scheduleInflightCommitRetry(shard);
+        }
+    }
+
+    void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
+        // This should've originated from a failed inflight commit but verify anyway
+        if(inflightCommit == null) {
+            return;
+        }
+
+        log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
+
+        if(!(cause instanceof NoShardLeaderException)) {
+            // If the failure is other than NoShardLeaderException the commit may have been partially
+            // processed so retry with a new transaction ID to be safe.
+            newInflightCommitWithDifferentTransactionID();
+        }
+
+        scheduleInflightCommitRetry(shard);
+    }
+
+    private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
+        FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
+
+        log.debug("Scheduling retry for BatchedModifications commit {} in {}",
+                inflightCommit.getTransactionID(), duration);
+
+        retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
+                COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
+    }
+
+    void inflightCommitSucceeded(EntityOwnershipShard shard) {
+        // Shouldn't be null but verify anyway
+        if(inflightCommit == null) {
+            return;
+        }
+
+        if(retryCommitSchedule != null) {
+            retryCommitSchedule.cancel();
+        }
+
+        log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
+
+        inflightCommit = null;
+        commitNextBatch(shard);
+    }
+
+    void commitNextBatch(EntityOwnershipShard shard) {
+        if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
+            return;
+        }
+
+        inflightCommit = newBatchedModifications();
+        Iterator<Modification> iter = pendingModifications.iterator();
+        while(iter.hasNext()) {
+            inflightCommit.addModification(iter.next());
+            iter.remove();
+            if(inflightCommit.getModifications().size() >=
+                    shard.getDatastoreContext().getShardBatchedModificationCount()) {
+                break;
+            }
+        }
+
+        log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
+                inflightCommit.getModifications().size());
+
+        shard.tryCommitModifications(inflightCommit);
+    }
+
+    void commitModification(Modification modification, EntityOwnershipShard shard) {
+        boolean hasLeader = shard.hasLeader();
+        if(inflightCommit != null || !hasLeader) {
+            if(log.isDebugEnabled()) {
+                log.debug("{} - adding modification to pending",
+                        (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
+            }
+
+            pendingModifications.add(modification);
+        } else {
+            inflightCommit = newBatchedModifications();
+            inflightCommit.addModification(modification);
+
+            shard.tryCommitModifications(inflightCommit);
+        }
+    }
+
+    void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
+        if(!isLeader && inflightCommit != null) {
+            // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
+            // consensus for the commit and switched to follower due to another node with a higher term. We
+            // can't be sure if the commit was replicated to any node so we retry it here with a new
+            // transaction ID.
+            if(retryCommitSchedule != null) {
+                retryCommitSchedule.cancel();
+            }
+
+            newInflightCommitWithDifferentTransactionID();
+            retryInflightCommit(shard);
+        } else {
+            commitNextBatch(shard);
+        }
+    }
+
+    private void newInflightCommitWithDifferentTransactionID() {
+        BatchedModifications newBatchedModifications = newBatchedModifications();
+        newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
+        inflightCommit = newBatchedModifications;
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        BatchedModifications modifications = new BatchedModifications(
+                TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+                DataStoreVersions.CURRENT_VERSION, "");
+        modifications.setDoCommitOnReady(true);
+        modifications.setReady(true);
+        modifications.setTotalMessagesSent(1);
+        return modifications;
+    }
+}
index 6ba09a0..72c95e0 100644 (file)
@@ -35,8 +35,8 @@ public class RegisterCandidateLocal {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=")
-                .append(entity).append("]");
+        builder.append("RegisterCandidateLocal [entity=").append(entity).append(", candidate=").append(candidate)
+                .append("]");
         return builder.toString();
     }
 }
index 3d31a06..c6ef275 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
@@ -65,7 +66,9 @@ public class AbstractEntityOwnershipTest extends AbstractActorTest {
         MapNode entityTypeMapNode = (MapNode) childNode.get();
         Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
                 childMap, child, key));
-        assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent());
+        if(!entityTypeEntry.isPresent()) {
+            fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue());
+        }
         return entityTypeEntry.get();
     }
 }
index e4aaaa1..d4c59cc 100644 (file)
@@ -18,7 +18,9 @@ import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -200,7 +203,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
 
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+                shardBatchedModificationCount(5);
 
         String peerId = actorFactory.generateActorId("leader");
         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
@@ -221,9 +225,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         MockLeader leader = peer.underlyingActor();
         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
+                LOCAL_MEMBER_NAME);
+
+        shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
 
-        leader.modificationsReceived = new CountDownLatch(2);
+        // Test with initial commit timeout and subsequent retry.
+
+        leader.modificationsReceived = new CountDownLatch(1);
         leader.sendReply = false;
 
         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
@@ -233,7 +242,34 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
-        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+        verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
+                LOCAL_MEMBER_NAME);
+
+        // Send a bunch of registration messages quickly and verify.
+
+        int max = 100;
+        leader.delay = 4;
+        leader.modificationsReceived = new CountDownLatch(max);
+        List<YangInstanceIdentifier> entityIds = new ArrayList<>();
+        for(int i = 1; i <= max; i++) {
+            YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
+            entityIds.add(id);
+            shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
+        }
+
+        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+                leader.modificationsReceived, 10, TimeUnit.SECONDS));
+
+        // Sleep a little to ensure no additional BatchedModifications are received.
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+        List<Modification> receivedMods = leader.getAndClearReceivedModifications();
+        for(int i = 0; i < max; i++) {
+            verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
+        }
+
+        assertEquals("# modifications received", max, receivedMods.size());
     }
 
     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
@@ -241,11 +277,16 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
     }
 
-    private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+    private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
             YangInstanceIdentifier entityId, String candidateName) throws Exception {
-        assertEquals("BatchedModifications size", 1, mods.getModifications().size());
-        assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
-        verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+        assertEquals("BatchedModifications size", 1, mods.size());
+        verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
+    }
+
+    private void verifyBatchedEntityCandidate(Modification mod, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) throws Exception {
+        assertEquals("Modification type", MergeModification.class, mod.getClass());
+        verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
                 entityId, candidateName);
     }
 
@@ -306,20 +347,39 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
     public static class MockLeader extends UntypedActor {
         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
-        volatile BatchedModifications receivedModifications;
+        List<Modification> receivedModifications = new ArrayList<>();
         volatile boolean sendReply = true;
+        volatile long delay;
 
         @Override
         public void onReceive(Object message) {
             if(message instanceof BatchedModifications) {
-                receivedModifications = (BatchedModifications) message;
-                modificationsReceived.countDown();
+                if(delay > 0) {
+                    Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
+                }
+
                 if(sendReply) {
+                    BatchedModifications mods = (BatchedModifications) message;
+                    synchronized (receivedModifications) {
+                        for(int i = 0; i < mods.getModifications().size(); i++) {
+                            receivedModifications.add(mods.getModifications().get(i));
+                            modificationsReceived.countDown();
+                        }
+                    }
+
                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
                 } else {
                     sendReply = true;
                 }
             }
         }
+
+        List<Modification> getAndClearReceivedModifications() {
+            synchronized (receivedModifications) {
+                List<Modification> ret = new ArrayList<>(receivedModifications);
+                receivedModifications.clear();
+                return ret;
+            }
+        }
     }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.