Migrate from YangInstanceIdentifier.EMPTY
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolverTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.reset;
12 import static org.mockito.Mockito.timeout;
13 import static org.mockito.Mockito.verify;
14 import static org.mockito.Mockito.verifyNoMoreInteractions;
15 import static org.mockito.Mockito.when;
16
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Status;
21 import akka.testkit.TestProbe;
22 import akka.testkit.javadsl.TestKit;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.Collections;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
36 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
37 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
38 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
39 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
40 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
41 import org.opendaylight.controller.cluster.access.concepts.MemberName;
42 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
44 import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
45 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
49 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
53 import scala.concurrent.Promise;
54
55 public class ModuleShardBackendResolverTest {
56
57     private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
58     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
59     private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
60     private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
61
62     private ActorSystem system;
63     private ModuleShardBackendResolver moduleShardBackendResolver;
64     private TestProbe contextProbe;
65     private TestProbe shardManagerProbe;
66
67     @Mock
68     private ShardStrategyFactory shardStrategyFactory;
69     @Mock
70     private ShardStrategy shardStrategy;
71     @Mock
72     private DataTree dataTree;
73
74     @Before
75     public void setUp() {
76         MockitoAnnotations.initMocks(this);
77         system = ActorSystem.apply();
78         contextProbe = new TestProbe(system, "context");
79
80         shardManagerProbe = new TestProbe(system, "ShardManager");
81
82         final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
83         when(actorUtils.getShardManager()).thenReturn(shardManagerProbe.ref());
84
85         moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
86         when(actorUtils.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
87         when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.empty())).thenReturn(shardStrategy);
88         final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
89         when(actorUtils.getPrimaryShardInfoCache()).thenReturn(cache);
90     }
91
92     @After
93     public void tearDown() {
94         TestKit.shutdownActorSystem(system);
95     }
96
97     @Test
98     public void testResolveShardForPathNonNullCookie() {
99         when(shardStrategy.findShard(YangInstanceIdentifier.empty())).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
100         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
101         Assert.assertEquals(0L, cookie.longValue());
102     }
103
104     @Test
105     public void testResolveShardForPathNullCookie() {
106         when(shardStrategy.findShard(YangInstanceIdentifier.empty())).thenReturn("foo");
107         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
108         Assert.assertEquals(1L, cookie.longValue());
109     }
110
111     @Test
112     public void testGetBackendInfo() throws Exception {
113         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
114         contextProbe.expectMsgClass(ConnectClientRequest.class);
115         final TestProbe backendProbe = new TestProbe(system, "backend");
116         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
117                 Collections.emptyList(), dataTree, 3);
118         contextProbe.reply(msg);
119         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
120         final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
121         Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
122         Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
123         Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
124     }
125
126     @Test
127     public void testGetBackendInfoFail() throws Exception {
128         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
129         final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
130         final RuntimeException cause = new RuntimeException();
131         final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
132         contextProbe.reply(response);
133         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
134         final ExecutionException caught =
135                 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
136                         ExecutionException.class);
137         Assert.assertEquals(cause, caught.getCause());
138     }
139
140     @Test
141     public void testRefreshBackendInfo() throws Exception {
142         final CompletionStage<ShardBackendInfo> backendInfo = moduleShardBackendResolver.getBackendInfo(0L);
143         //handle first connect
144         contextProbe.expectMsgClass(ConnectClientRequest.class);
145         final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
146         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
147                 Collections.emptyList(), dataTree, 3);
148         contextProbe.reply(msg);
149         //get backend info
150         final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
151         //refresh
152         final CompletionStage<ShardBackendInfo> refreshed =
153                 moduleShardBackendResolver.refreshBackendInfo(0L, staleBackendInfo);
154         //stale backend info should be removed and new connect request issued to the context
155         contextProbe.expectMsgClass(ConnectClientRequest.class);
156         final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
157         final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
158                 Collections.emptyList(), dataTree, 3);
159         contextProbe.reply(msg2);
160         final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
161         Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
162         Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
163     }
164
165     @SuppressWarnings("unchecked")
166     @Test
167     public void testNotifyWhenBackendInfoIsStale() {
168         final RegisterForShardAvailabilityChanges regMessage =
169                 shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
170         Registration mockReg = mock(Registration.class);
171         shardManagerProbe.reply(new Status.Success(mockReg));
172
173         Consumer<Long> mockCallback = mock(Consumer.class);
174         final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
175
176         regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
177         verify(mockCallback, timeout(5000)).accept(Long.valueOf(0));
178
179         reset(mockCallback);
180         callbackReg.close();
181
182         regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
183         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
184         verifyNoMoreInteractions(mockCallback);
185     }
186
187     private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
188         final ActorUtils mock = mock(ActorUtils.class);
189         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
190         final ActorSelection selection = system.actorSelection(actor.path());
191         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
192         promise.success(shardInfo);
193         when(mock.findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD)).thenReturn(promise.future());
194         when(mock.getClientDispatcher()).thenReturn(system.dispatchers().defaultGlobalDispatcher());
195         return mock;
196     }
197 }