Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolverTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.mock;
13 import static org.mockito.Mockito.reset;
14 import static org.mockito.Mockito.timeout;
15 import static org.mockito.Mockito.verify;
16 import static org.mockito.Mockito.verifyNoMoreInteractions;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Status;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.List;
26 import java.util.concurrent.CompletionStage;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.junit.MockitoJUnitRunner;
36 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
37 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
38 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
39 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
40 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
41 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
49 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.yangtools.concepts.Registration;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
54 import scala.concurrent.Promise;
55
56 @RunWith(MockitoJUnitRunner.StrictStubs.class)
57 public class ModuleShardBackendResolverTest {
58
59     private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
60     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
61     private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
62     private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
63
64     private ActorSystem system;
65     private ModuleShardBackendResolver moduleShardBackendResolver;
66     private TestProbe contextProbe;
67     private TestProbe shardManagerProbe;
68
69     @Mock
70     private ShardStrategyFactory shardStrategyFactory;
71     @Mock
72     private ShardStrategy shardStrategy;
73     @Mock
74     private DataTree dataTree;
75
76     @Before
77     public void setUp() {
78         system = ActorSystem.apply();
79         contextProbe = new TestProbe(system, "context");
80
81         shardManagerProbe = new TestProbe(system, "ShardManager");
82
83         final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
84         doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager();
85
86         moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
87         doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory();
88         doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty());
89         final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
90         doReturn(cache).when(actorUtils).getPrimaryShardInfoCache();
91     }
92
93     @After
94     public void tearDown() {
95         TestKit.shutdownActorSystem(system);
96     }
97
98     @Test
99     public void testResolveShardForPathNonNullCookie() {
100         doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty());
101         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
102         assertEquals(0L, (long) cookie);
103     }
104
105     @Test
106     public void testResolveShardForPathNullCookie() {
107         doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty());
108         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
109         assertEquals(1L, (long) cookie);
110     }
111
112     @Test
113     public void testGetBackendInfo() throws Exception {
114         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
115         contextProbe.expectMsgClass(ConnectClientRequest.class);
116         final TestProbe backendProbe = new TestProbe(system, "backend");
117         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
118                 List.of(), dataTree, 3);
119         contextProbe.reply(msg);
120         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
121         final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
122         assertEquals(0L, shardBackendInfo.getCookie().longValue());
123         assertEquals(dataTree, shardBackendInfo.getDataTree().get());
124         assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
125     }
126
127     @Test
128     public void testGetBackendInfoFail() throws Exception {
129         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
130         final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
131         final RuntimeException cause = new RuntimeException();
132         final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
133         contextProbe.reply(response);
134         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
135         final ExecutionException caught =
136                 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
137                         ExecutionException.class);
138         assertEquals(cause, caught.getCause());
139     }
140
141     @Test
142     public void testRefreshBackendInfo() throws Exception {
143         final CompletionStage<ShardBackendInfo> backendInfo = moduleShardBackendResolver.getBackendInfo(0L);
144         //handle first connect
145         contextProbe.expectMsgClass(ConnectClientRequest.class);
146         final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
147         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
148                 List.of(), dataTree, 3);
149         contextProbe.reply(msg);
150         //get backend info
151         final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
152         //refresh
153         final CompletionStage<ShardBackendInfo> refreshed =
154                 moduleShardBackendResolver.refreshBackendInfo(0L, staleBackendInfo);
155         //stale backend info should be removed and new connect request issued to the context
156         contextProbe.expectMsgClass(ConnectClientRequest.class);
157         final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
158         final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
159                 List.of(), dataTree, 3);
160         contextProbe.reply(msg2);
161         final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
162         assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
163         assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
164     }
165
166     @SuppressWarnings("unchecked")
167     @Test
168     public void testNotifyWhenBackendInfoIsStale() {
169         final RegisterForShardAvailabilityChanges regMessage =
170                 shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
171         Registration mockReg = mock(Registration.class);
172         shardManagerProbe.reply(new Status.Success(mockReg));
173
174         Consumer<Long> mockCallback = mock(Consumer.class);
175         final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
176
177         regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
178         verify(mockCallback, timeout(5000)).accept((long) 0);
179
180         reset(mockCallback);
181         callbackReg.close();
182
183         regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
184         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
185         verifyNoMoreInteractions(mockCallback);
186     }
187
188     private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
189         final ActorUtils mock = mock(ActorUtils.class);
190         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
191         final ActorSelection selection = system.actorSelection(actor.path());
192         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
193         promise.success(shardInfo);
194         doReturn(promise.future()).when(mock).findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD);
195         return mock;
196     }
197 }