1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import org.junit.Assert;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
18 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
21 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
22 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
24 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
31 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
32 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashMap;
44 import java.util.concurrent.ExecutionException;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
50 public class ShardTest extends AbstractActorTest {
52 private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
55 public void testOnReceiveRegisterListener() throws Exception {
56 new JavaTestKit(getSystem()) {{
57 final ShardIdentifier identifier =
58 ShardIdentifier.builder().memberName("member-1")
59 .shardName("inventory").type("config").build();
61 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
62 final ActorRef subject =
63 getSystem().actorOf(props, "testRegisterChangeListener");
65 new Within(duration("3 seconds")) {
67 protected void run() {
70 new UpdateSchemaContext(SchemaContextHelper.full()),
73 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
74 getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
77 final Boolean notificationEnabled = new ExpectMsg<Boolean>(
78 duration("3 seconds"), "enable notification") {
79 // do not put code outside this method, will run afterwards
81 protected Boolean match(Object in) {
82 if(in instanceof EnableNotification){
83 return ((EnableNotification) in).isEnabled();
88 }.get(); // this extracts the received message
90 assertFalse(notificationEnabled);
92 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
93 // do not put code outside this method, will run afterwards
95 protected String match(Object in) {
96 if (in.getClass().equals(RegisterChangeListenerReply.class)) {
97 RegisterChangeListenerReply reply =
98 (RegisterChangeListenerReply) in;
99 return reply.getListenerRegistrationPath()
105 }.get(); // this extracts the received message
107 assertTrue(out.matches(
108 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
117 public void testCreateTransaction(){
118 new JavaTestKit(getSystem()) {{
119 final ShardIdentifier identifier =
120 ShardIdentifier.builder().memberName("member-1")
121 .shardName("inventory").type("config").build();
123 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
124 final ActorRef subject =
125 getSystem().actorOf(props, "testCreateTransaction");
127 // Wait for a specific log message to show up
128 final boolean result =
129 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
132 protected Boolean run() {
135 }.from(subject.path().toString())
136 .message("Switching from state Candidate to Leader")
137 .occurrences(1).exec();
139 Assert.assertEquals(true, result);
141 new Within(duration("3 seconds")) {
143 protected void run() {
146 new UpdateSchemaContext(TestModel.createTestContext()),
149 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
152 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
153 // do not put code outside this method, will run afterwards
155 protected String match(Object in) {
156 if (in instanceof CreateTransactionReply) {
157 CreateTransactionReply reply =
158 (CreateTransactionReply) in;
159 return reply.getTransactionActorPath()
165 }.get(); // this extracts the received message
167 assertTrue("Unexpected transaction path " + out,
168 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
176 public void testCreateTransactionOnChain(){
177 new JavaTestKit(getSystem()) {{
178 final ShardIdentifier identifier =
179 ShardIdentifier.builder().memberName("member-1")
180 .shardName("inventory").type("config").build();
182 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
183 final ActorRef subject =
184 getSystem().actorOf(props, "testCreateTransactionOnChain");
186 // Wait for a specific log message to show up
187 final boolean result =
188 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
191 protected Boolean run() {
194 }.from(subject.path().toString())
195 .message("Switching from state Candidate to Leader")
196 .occurrences(1).exec();
198 Assert.assertEquals(true, result);
200 new Within(duration("3 seconds")) {
202 protected void run() {
205 new UpdateSchemaContext(TestModel.createTestContext()),
208 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
211 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
212 // do not put code outside this method, will run afterwards
214 protected String match(Object in) {
215 if (in instanceof CreateTransactionReply) {
216 CreateTransactionReply reply =
217 (CreateTransactionReply) in;
218 return reply.getTransactionActorPath()
224 }.get(); // this extracts the received message
226 assertTrue("Unexpected transaction path " + out,
227 out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
235 public void testPeerAddressResolved(){
236 new JavaTestKit(getSystem()) {{
237 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
239 final ShardIdentifier identifier =
240 ShardIdentifier.builder().memberName("member-1")
241 .shardName("inventory").type("config").build();
243 peerAddresses.put(identifier, null);
244 final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
245 final ActorRef subject =
246 getSystem().actorOf(props, "testPeerAddressResolved");
248 new Within(duration("3 seconds")) {
250 protected void run() {
253 new PeerAddressResolved(identifier, "akka://foobar"),
263 public void testApplySnapshot() throws ExecutionException, InterruptedException {
264 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
266 final ShardIdentifier identifier =
267 ShardIdentifier.builder().memberName("member-1")
268 .shardName("inventory").type("config").build();
270 peerAddresses.put(identifier, null);
271 final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
273 TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
275 ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
277 NormalizedNodeToNodeCodec codec =
278 new NormalizedNodeToNodeCodec(TestModel.createTestContext());
280 ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282 NormalizedNode expected = ref.underlyingActor().readStore();
284 NormalizedNodeMessages.Container encode = codec
285 .encode(YangInstanceIdentifier.builder().build(), expected);
288 ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
290 NormalizedNode actual = ref.underlyingActor().readStore();
292 assertEquals(expected, actual);
295 private static class ShardTestKit extends JavaTestKit {
297 private ShardTestKit(ActorSystem actorSystem) {
301 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
302 // Wait for a specific log message to show up
303 final boolean result =
304 new JavaTestKit.EventFilter<Boolean>(logLevel
307 protected Boolean run() {
310 }.from(subject.path().toString())
312 .occurrences(1).exec();
314 Assert.assertEquals(true, result);
321 public void testCreateSnapshot() throws IOException, InterruptedException {
322 new ShardTestKit(getSystem()) {{
323 final ShardIdentifier identifier =
324 ShardIdentifier.builder().memberName("member-1")
325 .shardName("inventory").type("config").build();
327 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
328 final ActorRef subject =
329 getSystem().actorOf(props, "testCreateSnapshot");
331 // Wait for a specific log message to show up
332 this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
335 new Within(duration("3 seconds")) {
337 protected void run() {
340 new UpdateSchemaContext(TestModel.createTestContext()),
343 subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
346 waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
351 deletePersistenceFiles();
356 * This test simply verifies that the applySnapShot logic will work
357 * @throws ReadFailedException
360 public void testInMemoryDataStoreRestore() throws ReadFailedException {
361 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
362 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
364 store.onGlobalContextUpdated(TestModel.createTestContext());
366 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
367 putTransaction.write(TestModel.TEST_PATH,
368 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
369 commitTransaction(putTransaction);
372 NormalizedNode expected = readStore(store);
374 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
376 writeTransaction.delete(YangInstanceIdentifier.builder().build());
377 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
379 commitTransaction(writeTransaction);
381 NormalizedNode actual = readStore(store);
383 assertEquals(expected, actual);
387 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
388 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
389 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
390 transaction.read(YangInstanceIdentifier.builder().build());
392 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
394 NormalizedNode<?, ?> normalizedNode = optional.get();
398 return normalizedNode;
401 private void commitTransaction(DOMStoreWriteTransaction transaction) {
402 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
403 ListenableFuture<Void> future =
404 commitCohort.preCommit();
407 future = commitCohort.commit();
409 } catch (InterruptedException | ExecutionException e) {
413 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
414 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
416 public void onDataChanged(
417 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {