Bug 4105: Pass ModuleShardConfiguration with CreateShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.mock;
16 import akka.actor.ActorRef;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import com.google.common.base.Function;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
30 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
31 import org.opendaylight.controller.cluster.datastore.config.Configuration;
32 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
33 import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
34 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
35 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
39 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
40 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
41 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
42 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
43 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import scala.concurrent.Await;
49 import scala.concurrent.Future;
50 import scala.concurrent.duration.Duration;
51
52 /**
53  * Unit tests for DistributedEntityOwnershipService.
54  *
55  * @author Thomas Pantelis
56  */
57 public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
58     static String ENTITY_TYPE = "test";
59     static String ENTITY_TYPE2 = "test2";
60     static int ID_COUNTER = 1;
61     static final QName QNAME = QName.create("test", "2015-08-11", "foo");
62
63     private final String dataStoreType = "config" + ID_COUNTER++;
64     private DistributedDataStore dataStore;
65
66     @Before
67     public void setUp() {
68         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
69                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
70
71         Configuration configuration = new ConfigurationImpl(new ModuleShardConfigProvider() {
72             @Override
73             public Map<String, ModuleConfig> retrieveModuleConfigs(Configuration configuration) {
74                 return Collections.emptyMap();
75             }
76         });
77
78         dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
79
80         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
81     }
82
83     @After
84     public void tearDown() {
85         dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender());
86     }
87
88     @Test
89     public void testEntityOwnershipShardCreated() throws Exception {
90         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
91         service.start();
92
93         Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
94                 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
95         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
96         assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
97
98         service.close();
99     }
100
101     @Test
102     public void testRegisterCandidate() throws Exception {
103         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
104         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
105             @Override
106             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
107                 return shardPropsCreator;
108             }
109         };
110
111         service.start();
112
113         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
114
115         YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
116         Entity entity = new Entity(ENTITY_TYPE, entityId);
117         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
118
119         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
120
121         verifyEntityOwnershipCandidateRegistration(entity, reg);
122         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
123         verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE, entityId,
124                 dataStore.getActorContext().getCurrentMemberName());
125
126         // Register the same entity - should throw exception
127
128         EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
129         try {
130             service.registerCandidate(entity, candidate2);
131             fail("Expected CandidateAlreadyRegisteredException");
132         } catch(CandidateAlreadyRegisteredException e) {
133             // expected
134             assertSame("getCandidate", candidate, e.getRegisteredCandidate());
135             assertEquals("getEntity", entity, e.getEntity());
136         }
137
138         // Register a different entity - should succeed
139
140         Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
141         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
142
143         EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
144
145         verifyEntityOwnershipCandidateRegistration(entity2, reg2);
146         verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
147         verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE2, entityId,
148                 dataStore.getActorContext().getCurrentMemberName());
149
150         service.close();
151     }
152
153     @Test
154     public void testCloseCandidateRegistration() throws Exception {
155         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
156         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
157             @Override
158             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
159                 return shardPropsCreator;
160             }
161         };
162
163         service.start();
164
165         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
166
167         Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
168         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
169
170         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
171
172         verifyEntityOwnershipCandidateRegistration(entity, reg);
173         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
174
175         shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class);
176
177         reg.close();
178
179         UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
180         assertEquals("getEntity", entity, unregCandidate.getEntity());
181         assertSame("getCandidate", candidate, unregCandidate.getCandidate());
182
183         // Re-register - should succeed.
184
185         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
186
187         service.registerCandidate(entity, candidate);
188
189         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
190
191         service.close();
192     }
193
194     @Test
195     public void testRegisterListener() {
196         // TODO
197     }
198
199     private void verifyEntityCandidate(ActorRef entityOwnershipShard, String entityType,
200             YangInstanceIdentifier entityId, String candidateName) {
201         verifyEntityCandidate(entityType, entityId, candidateName,
202                 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
203                     @Override
204                     public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
205                         try {
206                             return dataStore.newReadOnlyTransaction().read(path).get(5, TimeUnit.SECONDS).get();
207                         } catch (Exception e) {
208                             return null;
209                         }
210                     }
211                 });
212     }
213
214     private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
215             EntityOwnershipCandidate candidate) {
216         RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
217         assertSame("getCandidate", candidate, regCandidate.getCandidate());
218         assertEquals("getEntity", entity, regCandidate.getEntity());
219     }
220
221     private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
222         assertNotNull("EntityOwnershipCandidateRegistration null", reg);
223         assertEquals("getEntity", entity, reg.getEntity());
224     }
225
226     static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
227         TestShardPropsCreator() {
228             super("member-1");
229         }
230
231         private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
232         private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
233         private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
234
235         @Override
236         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
237                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
238             return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
239                     schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
240         }
241
242         @SuppressWarnings("unchecked")
243         <T> T waitForShardMessage() {
244             assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
245                     Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
246             assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
247             return (T) receivedMessage.get();
248         }
249
250         void expectShardMessage(Class<?> ofType) {
251             messageReceived.set(new CountDownLatch(1));
252             receivedMessage.set(null);
253             messageClass.set(ofType);
254         }
255     }
256
257     static class TestEntityOwnershipShard extends EntityOwnershipShard {
258         private final AtomicReference<CountDownLatch> messageReceived;
259         private final AtomicReference<Object> receivedMessage;
260         private final AtomicReference<Class<?>> messageClass;
261
262         protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
263                 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
264                 AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
265                 AtomicReference<Object> receivedMessage) {
266             super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
267             this.messageClass = messageClass;
268             this.messageReceived = messageReceived;
269             this.receivedMessage = receivedMessage;
270         }
271
272         @Override
273         public void onReceiveCommand(final Object message) throws Exception {
274             try {
275                 super.onReceiveCommand(message);
276             } finally {
277                 Class<?> expMsgClass = messageClass.get();
278                 if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
279                     receivedMessage.set(message);
280                     messageReceived.get().countDown();
281                 }
282             }
283         }
284     }
285 }