Bug 4105: Implement DistributedEntityOwnershipService#registerCandidate
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
27 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
28 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
29 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
33 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
34 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
35 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44
45 /**
46  * Unit tests for DistributedEntityOwnershipService.
47  *
48  * @author Thomas Pantelis
49  */
50 public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
51     static String ENTITY_TYPE = "test";
52     static String ENTITY_TYPE2 = "test2";
53     static int ID_COUNTER = 1;
54     static final QName QNAME = QName.create("test", "2015-08-11", "foo");
55
56     private final String dataStoreType = "config" + ID_COUNTER++;
57     private DistributedDataStore dataStore;
58
59     @Before
60     public void setUp() throws Exception {
61         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
62                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
63         dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
64                 new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
65
66         dataStore.onGlobalContextUpdated(TestModel.createTestContext());
67     }
68
69     @Test
70     public void testEntityOwnershipShardCreated() throws Exception {
71         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
72         service.start();
73
74         Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
75                 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
76         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
77         assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
78
79         service.close();
80     }
81
82     @Test
83     public void testRegisterCandidate() throws Exception {
84         final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
85         DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
86             @Override
87             protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
88                 return shardPropsCreator;
89             }
90         };
91
92         service.start();
93
94         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
95
96         Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
97         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
98
99         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
100
101         verifyEntityOwnershipCandidateRegistration(entity, reg);
102         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
103
104         // Test same entity - should throw exception
105
106         EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
107         try {
108             service.registerCandidate(entity, candidate2);
109             fail("Expected CandidateAlreadyRegisteredException");
110         } catch(CandidateAlreadyRegisteredException e) {
111             // expected
112             assertSame("getCandidate", candidate, e.getRegisteredCandidate());
113             assertEquals("getEntity", entity, e.getEntity());
114         }
115
116         // Test different entity
117
118         Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
119         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
120
121         EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
122
123         verifyEntityOwnershipCandidateRegistration(entity2, reg2);
124         verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
125
126         service.close();
127     }
128
129     @Test
130     public void testRegisterListener() {
131     }
132
133     private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
134             EntityOwnershipCandidate candidate) {
135         RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
136         assertSame("getCandidate", candidate, regCandidate.getCandidate());
137         assertEquals("getEntity", entity, regCandidate.getEntity());
138     }
139
140     private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
141         assertNotNull("EntityOwnershipCandidateRegistration null", reg);
142         assertEquals("getEntity", entity, reg.getEntity());
143     }
144
145     static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
146         private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
147         private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
148         private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
149
150         @Override
151         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
152                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
153             return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
154                     schemaContext, messageClass, messageReceived, receivedMessage);
155         }
156
157         @SuppressWarnings("unchecked")
158         <T> T waitForShardMessage() {
159             assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly(
160                     messageReceived.get(), 5, TimeUnit.SECONDS));
161             assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
162             return (T) receivedMessage.get();
163         }
164
165         void expectShardMessage(Class<?> ofType) {
166             messageReceived.set(new CountDownLatch(1));
167             receivedMessage.set(null);
168             messageClass.set(ofType);
169         }
170     }
171
172     static class TestEntityOwnershipShard extends EntityOwnershipShard {
173         private final AtomicReference<CountDownLatch> messageReceived;
174         private final AtomicReference<Object> receivedMessage;
175         private final AtomicReference<Class<?>> messageClass;
176
177         protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
178                 DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
179                 AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
180             super(name, peerAddresses, datastoreContext, schemaContext);
181             this.messageClass = messageClass;
182             this.messageReceived = messageReceived;
183             this.receivedMessage = receivedMessage;
184         }
185
186         @Override
187         public void onReceiveCommand(final Object message) throws Exception {
188             try {
189                 super.onReceiveCommand(message);
190             } finally {
191                 Class<?> expMsgClass = messageClass.get();
192                 if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
193                     receivedMessage.set(message);
194                     messageReceived.get().countDown();
195                 }
196             }
197         }
198     }
199 }