0776b502c24007b7c41edde6382a3c9eec3e48cb
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.mock;
16 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
35 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
41 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
44 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
45 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
46 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
47 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55
56 /**
57  * Unit tests for DistributedEntityOwnershipService.
58  *
59  * @author Thomas Pantelis
60  */
61 public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
62     static String ENTITY_TYPE = "test";
63     static String ENTITY_TYPE2 = "test2";
64     static int ID_COUNTER = 1;
65     static final QName QNAME = QName.create("test", "2015-08-11", "foo");
66
67     private final String dataStoreType = "config" + ID_COUNTER++;
68     private DistributedDataStore dataStore;
69
70     @Before
71     public void setUp() {
72         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
73                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
74
75         // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the
76         // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module.
77         MockConfiguration configuration = new MockConfiguration(Collections.<String, List<String>>emptyMap()) {
78             @Override
79             public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
80                 return Optional.of("entity-owners");
81             }
82
83             @Override
84             public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
85                 return ImmutableMap.<String, ShardStrategy>builder().put("entity-owners", new ShardStrategy() {
86                     @Override
87                     public String findShard(YangInstanceIdentifier path) {
88                         return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
89                     }
90                 }).build();
91             }
92         };
93
94         dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
95
96         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
97
98         ShardStrategyFactory.setConfiguration(configuration);
99     }
100
101     @After
102     public void tearDown() {
103         dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender());
104     }
105
106     @Test
107     public void testEntityOwnershipShardCreated() throws Exception {
108         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
109         service.start();
110
111         Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
112                 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
113         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
114         assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
115
116         service.close();
117     }
118
119     @Test
120     public void testRegisterCandidate() throws Exception {
121         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
122         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
123             @Override
124             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
125                 return shardPropsCreator;
126             }
127         };
128
129         service.start();
130
131         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
132
133         YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
134         Entity entity = new Entity(ENTITY_TYPE, entityId);
135         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
136
137         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
138
139         verifyEntityOwnershipCandidateRegistration(entity, reg);
140         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
141         verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
142                 dataStore.getActorContext().getCurrentMemberName());
143
144         // Register the same entity - should throw exception
145
146         EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
147         try {
148             service.registerCandidate(entity, candidate2);
149             fail("Expected CandidateAlreadyRegisteredException");
150         } catch(CandidateAlreadyRegisteredException e) {
151             // expected
152             assertSame("getCandidate", candidate, e.getRegisteredCandidate());
153             assertEquals("getEntity", entity, e.getEntity());
154         }
155
156         // Register a different entity - should succeed
157
158         Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
159         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
160
161         EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
162
163         verifyEntityOwnershipCandidateRegistration(entity2, reg2);
164         verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
165         verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
166                 dataStore.getActorContext().getCurrentMemberName());
167
168         service.close();
169     }
170
171     @Test
172     public void testCloseCandidateRegistration() throws Exception {
173         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
174         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
175             @Override
176             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
177                 return shardPropsCreator;
178             }
179         };
180
181         service.start();
182
183         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
184
185         Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
186         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
187
188         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
189
190         verifyEntityOwnershipCandidateRegistration(entity, reg);
191         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
192
193         shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class);
194
195         reg.close();
196
197         UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
198         assertEquals("getEntity", entity, unregCandidate.getEntity());
199         assertSame("getCandidate", candidate, unregCandidate.getCandidate());
200
201         // Re-register - should succeed.
202
203         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
204
205         service.registerCandidate(entity, candidate);
206
207         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
208
209         service.close();
210     }
211
212     @Test
213     public void testRegisterListener() {
214     }
215
216     private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
217         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
218         Stopwatch sw = Stopwatch.createStarted();
219         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
220             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
221             Optional<NormalizedNode<?, ?>> optional = readTx.read(ENTITY_OWNERS_PATH).
222                     checkedGet(5, TimeUnit.SECONDS);
223             if(optional.isPresent()) {
224                 return optional.get();
225             }
226
227             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
228         }
229
230         return null;
231     }
232
233     private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
234             EntityOwnershipCandidate candidate) {
235         RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
236         assertSame("getCandidate", candidate, regCandidate.getCandidate());
237         assertEquals("getEntity", entity, regCandidate.getEntity());
238     }
239
240     private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
241         assertNotNull("EntityOwnershipCandidateRegistration null", reg);
242         assertEquals("getEntity", entity, reg.getEntity());
243     }
244
245     static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
246         TestShardPropsCreator() {
247             super("member-1");
248         }
249
250         private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
251         private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
252         private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
253
254         @Override
255         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
256                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
257             return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
258                     schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
259         }
260
261         @SuppressWarnings("unchecked")
262         <T> T waitForShardMessage() {
263             assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
264                     Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
265             assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
266             return (T) receivedMessage.get();
267         }
268
269         void expectShardMessage(Class<?> ofType) {
270             messageReceived.set(new CountDownLatch(1));
271             receivedMessage.set(null);
272             messageClass.set(ofType);
273         }
274     }
275
276     static class TestEntityOwnershipShard extends EntityOwnershipShard {
277         private final AtomicReference<CountDownLatch> messageReceived;
278         private final AtomicReference<Object> receivedMessage;
279         private final AtomicReference<Class<?>> messageClass;
280
281         protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
282                 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
283                 AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
284                 AtomicReference<Object> receivedMessage) {
285             super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
286             this.messageClass = messageClass;
287             this.messageReceived = messageReceived;
288             this.receivedMessage = receivedMessage;
289         }
290
291         @Override
292         public void onReceiveCommand(final Object message) throws Exception {
293             try {
294                 super.onReceiveCommand(message);
295             } finally {
296                 Class<?> expMsgClass = messageClass.get();
297                 if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
298                     receivedMessage.set(message);
299                     messageReceived.get().countDown();
300                 }
301             }
302         }
303     }
304 }