2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.Collections;
19 import java.util.List;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
27 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
28 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
29 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
33 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
34 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
35 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
46 * Unit tests for DistributedEntityOwnershipService.
48 * @author Thomas Pantelis
50 public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
51 static String ENTITY_TYPE = "test";
52 static String ENTITY_TYPE2 = "test2";
53 static int ID_COUNTER = 1;
54 static final QName QNAME = QName.create("test", "2015-08-11", "foo");
56 private final String dataStoreType = "config" + ID_COUNTER++;
57 private DistributedDataStore dataStore;
60 public void setUp() throws Exception {
61 DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
62 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
63 dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
64 new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
66 dataStore.onGlobalContextUpdated(TestModel.createTestContext());
70 public void testEntityOwnershipShardCreated() throws Exception {
71 DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
74 Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
75 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
76 ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
77 assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
83 public void testRegisterCandidate() throws Exception {
84 final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
85 DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
87 protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
88 return shardPropsCreator;
94 shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
96 Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
97 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
99 EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
101 verifyEntityOwnershipCandidateRegistration(entity, reg);
102 verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
104 // Test same entity - should throw exception
106 EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
108 service.registerCandidate(entity, candidate2);
109 fail("Expected CandidateAlreadyRegisteredException");
110 } catch(CandidateAlreadyRegisteredException e) {
112 assertSame("getCandidate", candidate, e.getRegisteredCandidate());
113 assertEquals("getEntity", entity, e.getEntity());
116 // Test different entity
118 Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
119 shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
121 EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
123 verifyEntityOwnershipCandidateRegistration(entity2, reg2);
124 verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
130 public void testRegisterListener() {
133 private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
134 EntityOwnershipCandidate candidate) {
135 RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
136 assertSame("getCandidate", candidate, regCandidate.getCandidate());
137 assertEquals("getEntity", entity, regCandidate.getEntity());
140 private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
141 assertNotNull("EntityOwnershipCandidateRegistration null", reg);
142 assertEquals("getEntity", entity, reg.getEntity());
145 static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
146 private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
147 private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
148 private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
151 public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
152 DatastoreContext datastoreContext, SchemaContext schemaContext) {
153 return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
154 schemaContext, messageClass, messageReceived, receivedMessage);
157 @SuppressWarnings("unchecked")
158 <T> T waitForShardMessage() {
159 assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly(
160 messageReceived.get(), 5, TimeUnit.SECONDS));
161 assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
162 return (T) receivedMessage.get();
165 void expectShardMessage(Class<?> ofType) {
166 messageReceived.set(new CountDownLatch(1));
167 receivedMessage.set(null);
168 messageClass.set(ofType);
172 static class TestEntityOwnershipShard extends EntityOwnershipShard {
173 private final AtomicReference<CountDownLatch> messageReceived;
174 private final AtomicReference<Object> receivedMessage;
175 private final AtomicReference<Class<?>> messageClass;
177 protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
178 DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
179 AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
180 super(name, peerAddresses, datastoreContext, schemaContext);
181 this.messageClass = messageClass;
182 this.messageReceived = messageReceived;
183 this.receivedMessage = receivedMessage;
187 public void onReceiveCommand(final Object message) throws Exception {
189 super.onReceiveCommand(message);
191 Class<?> expMsgClass = messageClass.get();
192 if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
193 receivedMessage.set(message);
194 messageReceived.get().countDown();