Bug 4105: Add EntityOwnerDataChangeListener
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.mock;
16 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
35 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
41 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
42 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
43 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
44 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
45 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
46 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
47 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55
56 /**
57  * Unit tests for DistributedEntityOwnershipService.
58  *
59  * @author Thomas Pantelis
60  */
61 public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
62     static String ENTITY_TYPE = "test";
63     static String ENTITY_TYPE2 = "test2";
64     static int ID_COUNTER = 1;
65     static final QName QNAME = QName.create("test", "2015-08-11", "foo");
66
67     private final String dataStoreType = "config" + ID_COUNTER++;
68     private DistributedDataStore dataStore;
69
70     @Before
71     public void setUp() {
72         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
73                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
74
75         // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the
76         // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module.
77         MockConfiguration configuration = new MockConfiguration(Collections.<String, List<String>>emptyMap()) {
78             @Override
79             public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
80                 return Optional.of("entity-owners");
81             }
82
83             @Override
84             public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
85                 return ImmutableMap.<String, ShardStrategy>builder().put("entity-owners", new ShardStrategy() {
86                     @Override
87                     public String findShard(YangInstanceIdentifier path) {
88                         return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
89                     }
90                 }).build();
91             }
92         };
93
94         dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
95
96         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
97
98         ShardStrategyFactory.setConfiguration(configuration);
99     }
100
101     @After
102     public void tearDown() {
103         dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender());
104     }
105
106     @Test
107     public void testEntityOwnershipShardCreated() throws Exception {
108         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
109         service.start();
110
111         Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
112                 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
113         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
114         assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
115
116         service.close();
117     }
118
119     @Test
120     public void testRegisterCandidate() throws Exception {
121         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
122         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
123             @Override
124             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
125                 return shardPropsCreator;
126             }
127         };
128
129         service.start();
130
131         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
132
133         YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
134         Entity entity = new Entity(ENTITY_TYPE, entityId);
135         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
136
137         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
138
139         verifyEntityOwnershipCandidateRegistration(entity, reg);
140         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
141         verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
142                 dataStore.getActorContext().getCurrentMemberName());
143
144         // Register the same entity - should throw exception
145
146         EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
147         try {
148             service.registerCandidate(entity, candidate2);
149             fail("Expected CandidateAlreadyRegisteredException");
150         } catch(CandidateAlreadyRegisteredException e) {
151             // expected
152             assertSame("getCandidate", candidate, e.getRegisteredCandidate());
153             assertEquals("getEntity", entity, e.getEntity());
154         }
155
156         // Register a different entity - should succeed
157
158         Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
159         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
160
161         EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
162
163         verifyEntityOwnershipCandidateRegistration(entity2, reg2);
164         verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
165         verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
166                 dataStore.getActorContext().getCurrentMemberName());
167
168         service.close();
169     }
170
171     @Test
172     public void testCloseCandidateRegistration() throws Exception {
173         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
174         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
175             @Override
176             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
177                 return shardPropsCreator;
178             }
179         };
180
181         service.start();
182
183         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
184
185         Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
186         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
187
188         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
189
190         verifyEntityOwnershipCandidateRegistration(entity, reg);
191         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
192
193         shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class);
194
195         reg.close();
196
197         UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
198         assertEquals("getEntity", entity, unregCandidate.getEntity());
199
200         // Re-register - should succeed.
201
202         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
203
204         service.registerCandidate(entity, candidate);
205
206         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
207
208         service.close();
209     }
210
211     @Test
212     public void testRegisterListener() {
213     }
214
215     private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
216         Stopwatch sw = Stopwatch.createStarted();
217         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
218             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
219             Optional<NormalizedNode<?, ?>> optional = readTx.read(ENTITY_OWNERS_PATH).
220                     checkedGet(5, TimeUnit.SECONDS);
221             if(optional.isPresent()) {
222                 return optional.get();
223             }
224
225             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
226         }
227
228         return null;
229     }
230
231     private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
232             EntityOwnershipCandidate candidate) {
233         RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
234         assertSame("getCandidate", candidate, regCandidate.getCandidate());
235         assertEquals("getEntity", entity, regCandidate.getEntity());
236     }
237
238     private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
239         assertNotNull("EntityOwnershipCandidateRegistration null", reg);
240         assertEquals("getEntity", entity, reg.getEntity());
241     }
242
243     static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
244         TestShardPropsCreator() {
245             super("member-1");
246         }
247
248         private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
249         private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
250         private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
251
252         @Override
253         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
254                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
255             return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
256                     schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
257         }
258
259         @SuppressWarnings("unchecked")
260         <T> T waitForShardMessage() {
261             assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
262                     Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
263             assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
264             return (T) receivedMessage.get();
265         }
266
267         void expectShardMessage(Class<?> ofType) {
268             messageReceived.set(new CountDownLatch(1));
269             receivedMessage.set(null);
270             messageClass.set(ofType);
271         }
272     }
273
274     static class TestEntityOwnershipShard extends EntityOwnershipShard {
275         private final AtomicReference<CountDownLatch> messageReceived;
276         private final AtomicReference<Object> receivedMessage;
277         private final AtomicReference<Class<?>> messageClass;
278
279         protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
280                 DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
281                 AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
282                 AtomicReference<Object> receivedMessage) {
283             super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
284             this.messageClass = messageClass;
285             this.messageReceived = messageReceived;
286             this.receivedMessage = receivedMessage;
287         }
288
289         @Override
290         public void onReceiveCommand(final Object message) throws Exception {
291             try {
292                 super.onReceiveCommand(message);
293             } finally {
294                 Class<?> expMsgClass = messageClass.get();
295                 if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
296                     receivedMessage.set(message);
297                     messageReceived.get().countDown();
298                 }
299             }
300         }
301     }
302 }