import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps(), "testRegisterChangeListener");
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListener");
waitUntilLeader(shard);
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps(), "testRegisterDataTreeChangeListener");
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterDataTreeChangeListener");
waitUntilLeader(shard);
final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
final NormalizedNode<?,?> expected = readStore(store, root);
- final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
+ final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
@Test
public void testApplyState() throws Exception {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
ShardTestKit.waitUntilLeader(shard);
writeMod.write(TestModel.TEST_PATH, node);
writeMod.ready();
- final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
- new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
+ final TransactionIdentifier tx = nextTransactionId();
+ final ApplyState applyState = new ApplyState(null, tx,
+ new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
shard.tell(applyState, shard);
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
+ payloadForModification(source, writeMod, nextTransactionId())));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
final DataTreeModification mod = source.takeSnapshot().newModification();
mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
mod.ready();
+
InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
- payloadForModification(source, mod)));
+ payloadForModification(source, mod, nextTransactionId())));
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
};
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+ Props.create(new DelegatingShardCreator(creator)).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), "testOnBatchedModificationsWhenNotLeader");
waitUntilLeader(shard);
testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
}
- @SuppressWarnings("serial")
private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)), shardActorName);
+ Props.create(new DelegatingShardCreator(creator)).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
waitUntilLeader(shard);
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
awaitAndValidateSnapshot(expectedRoot);
}
- private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
savedSnapshot.set(null);
}
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
-
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) throws IOException {
+ final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode().get();
assertEquals("Root node", expectedRoot, actual);
-
}};
}