import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
// this will cause all other messages to not be queued properly after that.
// The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
// it does do a persist)
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
"testRegisterDataTreeChangeListener-DataTreeChangeListener");
- shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterDataTreeChangeListenerReply.class);
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
+ return new Shard(Shard.builder().id(shardID).datastoreContext(
+ dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof ElectionTimeout && firstElectionTimeout) {
assertEquals("Got first ElectionTimeout", true,
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
- shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
TestShard() {
- super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
- newDatastoreContext(), SCHEMA_CONTEXT);
+ super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
+ schemaContext(SCHEMA_CONTEXT));
}
String getPeerAddress(String id) {
testkit.waitUntilLeader(shard);
- final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
}
DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
- final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
testStore.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
protected boolean isLeader() {
return overrideLeaderCalls.get() ? false : super.isLeader();
final Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+ return new Shard(newShardBuilder()) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
super.onReceiveCommand(message);
new ShardTestKit(getSystem()) {{
class TestShard extends Shard {
- protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ protected TestShard(AbstractBuilder<?, ?> builder) {
+ super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}
final Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new TestShard(shardID, Collections.<String,String>emptyMap(),
- newDatastoreContext(), SCHEMA_CONTEXT);
+ return new TestShard(newShardBuilder());
}
};
Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
// Trigger creation of a snapshot by ensuring
final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
-
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
- latch.set(new CountDownLatch(1));
- savedSnapshot.set(null);
+ awaitAndValidateSnapshot(expectedRoot);
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+ awaitAndValidateSnapshot(expectedRoot);
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+ ) throws InterruptedException {
+ System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+ assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
+ latch.set(new CountDownLatch(1));
+ savedSnapshot.set(null);
+ }
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
- assertEquals("Root node", expectedRoot, actual);
+ final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
- }};
+ }
+ };
}
/**
*/
@Test
public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
- final DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
final DataTreeModification putTransaction = store.takeSnapshot().newModification();
final DatastoreContext persistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
- final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- persistentContext, SCHEMA_CONTEXT);
+ final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
+ schemaContext(SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
- final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
- nonPersistentContext, SCHEMA_CONTEXT);
+ final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
+ schemaContext(SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
}
@Test
- public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+ public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- return new Shard(shardID, Collections.<String,String>emptyMap(),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
- if(message instanceof ElectionTimeout && firstElectionTimeout) {
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.onReceiveCommand(message);
- }
- }
- };
- }
- };
+ String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
- "testDataChangeListenerOnFollower-DataChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
- withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
- assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- shard.tell(new FindLeader(), getRef());
- final FindLeaderReply findLeadeReply =
- expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+ waitUntilNoLeader(shard);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- onChangeListenerRegistered.countDown();
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
- public void testClusteredDataChangeListernerRegistration() throws Exception {
+ public void testClusteredDataChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
- final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
- .shardName("inventory").type("config").build();
- final Creator<Shard> followerShardCreator = new Creator<Shard>() {
+ String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
- @Override
- public Shard create() throws Exception {
- return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
- "akka://test/user/" + member2ShardID.toString()),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
- if(!(message instanceof ElectionTimeout)) {
- super.onReceiveCommand(message);
- }
- }
- };
- }
- };
+ followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
- final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- @Override
- public Shard create() throws Exception {
- return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
- "akka://test/user/" + member1ShardID.toString()),
- dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
- }
- };
+ listener.waitForChangeEvents();
+ }};
+ }
+ @Test
+ public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(followerShardCreator)),
- member1ShardID.toString());
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
- final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
- member2ShardID.toString());
- // Sleep to let election happen
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
- shard.tell(new FindLeader(), getRef());
- final FindLeaderReply findLeaderReply =
- expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+ waitUntilNoLeader(shard);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
- "testDataChangeListenerOnFollower-DataChangeListener");
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
listener.waitForChangeEvents();
+ }};
+ }
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ @Test
+ public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
}};
}
+
+ @Test
+ public void testServerRemoved() throws Exception {
+ final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+
+ final ActorRef shard = parent.underlyingActor().context().actorOf(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testServerRemoved");
+
+ shard.tell(new ServerRemoved("test"), ActorRef.noSender());
+
+ MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
+
+ }
+
}