1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.ActorSystem;
7 import akka.actor.Props;
8 import akka.dispatch.ExecutionContexts;
9 import akka.dispatch.Futures;
10 import akka.util.Timeout;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
17 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
20 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
21 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
22 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
26 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.ListenerRegistration;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import scala.concurrent.ExecutionContextExecutor;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.FiniteDuration;
38 import java.util.concurrent.TimeUnit;
40 import static junit.framework.TestCase.assertEquals;
41 import static junit.framework.TestCase.assertNull;
42 import static org.junit.Assert.assertNotNull;
43 import static org.junit.Assert.assertTrue;
44 import static org.mockito.Matchers.any;
45 import static org.mockito.Matchers.anyObject;
46 import static org.mockito.Matchers.anyString;
47 import static org.mockito.Matchers.eq;
48 import static org.mockito.Mockito.mock;
49 import static org.mockito.Mockito.verify;
50 import static org.mockito.Mockito.when;
52 public class DistributedDataStoreTest extends AbstractActorTest{
54 private DistributedDataStore distributedDataStore;
55 private MockActorContext mockActorContext;
56 private ActorRef doNothingActorRef;
59 public void setUp() throws Exception {
60 ShardStrategyFactory.setConfiguration(new MockConfiguration());
61 final Props props = Props.create(DoNothingActor.class);
63 doNothingActorRef = getSystem().actorOf(props);
65 mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
66 distributedDataStore = new DistributedDataStore(mockActorContext);
67 distributedDataStore.onGlobalContextUpdated(
68 TestModel.createTestContext());
70 // Make CreateTransactionReply as the default response. Will need to be
71 // tuned if a specific test requires some other response
72 mockActorContext.setExecuteShardOperationResponse(
73 CreateTransactionReply.newBuilder()
74 .setTransactionActorPath(doNothingActorRef.path().toString())
75 .setTransactionId("txn-1 ")
80 public void tearDown() throws Exception {
84 @SuppressWarnings("resource")
86 public void testConstructor(){
87 ActorSystem actorSystem = mock(ActorSystem.class);
89 new DistributedDataStore(actorSystem, "config",
90 mock(ClusterWrapper.class), mock(Configuration.class),
91 new DatastoreContext());
93 verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
97 public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
99 ListenerRegistration registration =
100 distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
102 public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
103 throw new UnsupportedOperationException("onDataChanged");
105 }, AsyncDataBroker.DataChangeScope.BASE);
107 // Since we do not expect the shard to be local registration will return a NoOpRegistration
108 assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
110 assertNotNull(registration);
114 public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
115 ActorContext actorContext = mock(ActorContext.class);
117 distributedDataStore = new DistributedDataStore(actorContext);
118 distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
120 Future future = mock(Future.class);
121 when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
122 when(actorContext.getActorSystem()).thenReturn(getSystem());
123 when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
125 .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
127 ListenerRegistration registration =
128 distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
129 mock(AsyncDataChangeListener.class),
130 AsyncDataBroker.DataChangeScope.BASE);
132 assertNotNull(registration);
134 assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
138 public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
139 ActorContext actorContext = mock(ActorContext.class);
141 distributedDataStore = new DistributedDataStore(actorContext);
142 distributedDataStore.onGlobalContextUpdated(
143 TestModel.createTestContext());
145 ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
147 // Make Future successful
148 Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
151 ActorSystem actorSystem = mock(ActorSystem.class);
152 ActorSelection actorSelection = mock(ActorSelection.class);
154 when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
155 when(actorSystem.dispatcher()).thenReturn(executor);
156 when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
157 when(actorContext.getActorSystem()).thenReturn(actorSystem);
158 when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
160 .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
161 when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
163 ListenerRegistration registration =
164 distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
165 mock(AsyncDataChangeListener.class),
166 AsyncDataBroker.DataChangeScope.BASE);
168 assertNotNull(registration);
170 assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
172 ActorSelection listenerRegistrationActor =
173 ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
175 assertNotNull(listenerRegistrationActor);
177 assertEquals(actorSelection, listenerRegistrationActor);
181 public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
182 ActorContext actorContext = mock(ActorContext.class);
184 distributedDataStore = new DistributedDataStore(actorContext);
185 distributedDataStore.onGlobalContextUpdated(
186 TestModel.createTestContext());
188 ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
191 Future f = Futures.failed(new IllegalArgumentException());
194 ActorSystem actorSystem = mock(ActorSystem.class);
195 ActorSelection actorSelection = mock(ActorSelection.class);
197 when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
198 when(actorSystem.dispatcher()).thenReturn(executor);
199 when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
200 when(actorContext.getActorSystem()).thenReturn(actorSystem);
201 when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
203 .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
204 when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
206 ListenerRegistration registration =
207 distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
208 mock(AsyncDataChangeListener.class),
209 AsyncDataBroker.DataChangeScope.BASE);
211 assertNotNull(registration);
213 assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
215 ActorSelection listenerRegistrationActor =
216 ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
218 assertNull(listenerRegistrationActor);
224 public void testCreateTransactionChain() throws Exception {
225 final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
226 assertNotNull(transactionChain);
230 public void testNewReadOnlyTransaction() throws Exception {
231 final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
232 assertNotNull(transaction);
236 public void testNewWriteOnlyTransaction() throws Exception {
237 final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
238 assertNotNull(transaction);
242 public void testNewReadWriteTransaction() throws Exception {
243 final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
244 assertNotNull(transaction);