1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Optional;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.mockito.ArgumentMatcher;
17 import org.mockito.Mock;
18 import org.mockito.MockitoAnnotations;
20 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
21 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
24 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
25 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.FiniteDuration;
54 import java.util.Arrays;
55 import java.util.concurrent.TimeUnit;
57 import static org.mockito.Matchers.any;
58 import static org.mockito.Matchers.anyString;
59 import static org.mockito.Mockito.doReturn;
60 import static org.mockito.Mockito.doThrow;
61 import static org.mockito.Mockito.argThat;
62 import static org.mockito.Mockito.eq;
63 import static org.mockito.Mockito.verify;
64 import static org.mockito.Mockito.isA;
66 @SuppressWarnings("resource")
67 public class TransactionProxyTest extends AbstractActorTest {
69 @SuppressWarnings("serial")
70 static class TestException extends RuntimeException {
73 static interface Invoker {
74 void invoke(TransactionProxy proxy) throws Exception;
77 private final Configuration configuration = new MockConfiguration();
80 private ActorContext mockActorContext;
82 private SchemaContext schemaContext;
84 String memberName = "mock-member";
88 MockitoAnnotations.initMocks(this);
90 schemaContext = TestModel.createTestContext();
92 doReturn(getSystem()).when(mockActorContext).getActorSystem();
94 ShardStrategyFactory.setConfiguration(configuration);
97 private CreateTransaction eqCreateTransaction(final String memberName,
98 final TransactionType type) {
99 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
101 public boolean matches(Object argument) {
102 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
103 return obj.getTransactionId().startsWith(memberName) &&
104 obj.getTransactionType() == type.ordinal();
108 return argThat(matcher);
111 private DataExists eqDataExists() {
112 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
114 public boolean matches(Object argument) {
115 DataExists obj = DataExists.fromSerializable(argument);
116 return obj.getPath().equals(TestModel.TEST_PATH);
120 return argThat(matcher);
123 private ReadData eqReadData() {
124 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
126 public boolean matches(Object argument) {
127 ReadData obj = ReadData.fromSerializable(argument);
128 return obj.getPath().equals(TestModel.TEST_PATH);
132 return argThat(matcher);
135 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
136 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
138 public boolean matches(Object argument) {
139 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
140 return obj.getPath().equals(TestModel.TEST_PATH) &&
141 obj.getData().equals(nodeToWrite);
145 return argThat(matcher);
148 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
149 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
151 public boolean matches(Object argument) {
152 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
153 return obj.getPath().equals(TestModel.TEST_PATH) &&
154 obj.getData().equals(nodeToWrite);
158 return argThat(matcher);
161 private DeleteData eqDeleteData() {
162 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
164 public boolean matches(Object argument) {
165 DeleteData obj = DeleteData.fromSerializable(argument);
166 return obj.getPath().equals(TestModel.TEST_PATH);
170 return argThat(matcher);
173 private Object readyTxReply(ActorPath path) {
174 return new ReadyTransactionReply(path).toSerializable();
177 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
178 return Futures.successful(new ReadDataReply(schemaContext, data)
182 private Future<Object> dataExistsReply(boolean exists) {
183 return Futures.successful(new DataExistsReply(exists).toSerializable());
186 private ActorSelection actorSelection(ActorRef actorRef) {
187 return getSystem().actorSelection(actorRef.path());
190 private FiniteDuration anyDuration() {
191 return any(FiniteDuration.class);
194 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
195 return CreateTransactionReply.newBuilder()
196 .setTransactionActorPath(actorRef.path().toString())
197 .setTransactionId("txn-1").build();
200 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
201 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
202 doReturn(getSystem().actorSelection(actorRef.path())).
203 when(mockActorContext).actorSelection(actorRef.path().toString());
204 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
205 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
206 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
207 eqCreateTransaction(memberName, type), anyDuration());
208 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
209 anyString(), eq(actorRef.path().toString()));
210 doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
216 public void testRead() throws Exception {
217 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
219 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
220 READ_ONLY, schemaContext);
222 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
223 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
225 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
226 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
228 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
230 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
232 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
233 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
235 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
237 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
239 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
242 @Test(expected = ReadFailedException.class)
243 public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
244 setupActorContextWithInitialCreateTransaction(READ_ONLY);
246 doReturn(Futures.successful(new Object())).when(mockActorContext).
247 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
249 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
250 READ_ONLY, schemaContext);
252 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
255 @Test(expected = TestException.class)
256 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
257 setupActorContextWithInitialCreateTransaction(READ_ONLY);
259 doThrow(new TestException()).when(mockActorContext).
260 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
262 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
263 READ_ONLY, schemaContext);
266 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
267 fail("Expected ReadFailedException");
268 } catch(ReadFailedException e) {
269 // Expected - throw cause - expects TestException.
274 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
277 doThrow(exToThrow).when(mockActorContext).executeShardOperation(
278 anyString(), any(), anyDuration());
280 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
281 READ_ONLY, schemaContext);
284 invoker.invoke(transactionProxy);
285 fail("Expected ReadFailedException");
286 } catch(ReadFailedException e) {
287 // Expected - throw cause - expects TestException.
292 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
293 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
295 public void invoke(TransactionProxy proxy) throws Exception {
296 proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
301 @Test(expected = PrimaryNotFoundException.class)
302 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
303 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
306 @Test(expected = TimeoutException.class)
307 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
308 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
309 new Exception("reason")));
312 @Test(expected = TestException.class)
313 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
314 testReadWithExceptionOnInitialCreateTransaction(new TestException());
318 public void testExists() throws Exception {
319 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
321 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
322 READ_ONLY, schemaContext);
324 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
325 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
327 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
329 assertEquals("Exists response", false, exists);
331 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
332 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
334 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
336 assertEquals("Exists response", true, exists);
339 @Test(expected = PrimaryNotFoundException.class)
340 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
341 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
343 public void invoke(TransactionProxy proxy) throws Exception {
344 proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
349 @Test(expected = ReadFailedException.class)
350 public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
351 setupActorContextWithInitialCreateTransaction(READ_ONLY);
353 doReturn(Futures.successful(new Object())).when(mockActorContext).
354 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
356 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
357 READ_ONLY, schemaContext);
359 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
362 @Test(expected = TestException.class)
363 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
364 setupActorContextWithInitialCreateTransaction(READ_ONLY);
366 doThrow(new TestException()).when(mockActorContext).
367 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
369 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
370 READ_ONLY, schemaContext);
373 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
374 fail("Expected ReadFailedException");
375 } catch(ReadFailedException e) {
376 // Expected - throw cause - expects TestException.
382 public void testWrite() throws Exception {
383 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
385 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
386 WRITE_ONLY, schemaContext);
388 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
392 verify(mockActorContext).sendRemoteOperationAsync(
393 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
397 public void testMerge() throws Exception {
398 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
400 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
401 WRITE_ONLY, schemaContext);
403 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
405 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
407 verify(mockActorContext).sendRemoteOperationAsync(
408 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
412 public void testDelete() throws Exception {
413 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
415 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
416 WRITE_ONLY, schemaContext);
418 transactionProxy.delete(TestModel.TEST_PATH);
420 verify(mockActorContext).sendRemoteOperationAsync(
421 eq(actorSelection(actorRef)), eqDeleteData());
424 @SuppressWarnings("unchecked")
426 public void testReady() throws Exception {
427 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
429 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
430 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
432 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
433 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
435 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
436 READ_WRITE, schemaContext);
438 transactionProxy.read(TestModel.TEST_PATH);
440 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
442 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
444 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
446 assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
450 public void testGetIdentifier() {
451 setupActorContextWithInitialCreateTransaction(READ_ONLY);
452 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
453 TransactionProxy.TransactionType.READ_ONLY, schemaContext);
455 Object id = transactionProxy.getIdentifier();
456 assertNotNull("getIdentifier returned null", id);
457 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
460 @SuppressWarnings("unchecked")
462 public void testClose() throws Exception{
463 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
465 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
466 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
468 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
469 READ_WRITE, schemaContext);
471 transactionProxy.read(TestModel.TEST_PATH);
473 transactionProxy.close();
475 verify(mockActorContext).sendRemoteOperationAsync(
476 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));