2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.mock;
13 import static org.mockito.Mockito.reset;
14 import static org.mockito.Mockito.timeout;
15 import static org.mockito.Mockito.verify;
16 import static org.mockito.Mockito.verifyNoMoreInteractions;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Status;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.List;
26 import java.util.concurrent.CompletionStage;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.junit.MockitoJUnitRunner;
36 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
37 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
38 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
39 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
40 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
41 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
49 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.yangtools.concepts.Registration;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
54 import scala.concurrent.Promise;
56 @RunWith(MockitoJUnitRunner.StrictStubs.class)
57 public class ModuleShardBackendResolverTest {
59 private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
60 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
61 private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
62 private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
64 private ActorSystem system;
65 private ModuleShardBackendResolver moduleShardBackendResolver;
66 private TestProbe contextProbe;
67 private TestProbe shardManagerProbe;
70 private ShardStrategyFactory shardStrategyFactory;
72 private ShardStrategy shardStrategy;
74 private DataTree dataTree;
78 system = ActorSystem.apply();
79 contextProbe = new TestProbe(system, "context");
81 shardManagerProbe = new TestProbe(system, "ShardManager");
83 final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
84 doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager();
86 moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
87 doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory();
88 doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty());
89 final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
90 doReturn(cache).when(actorUtils).getPrimaryShardInfoCache();
94 public void tearDown() {
95 TestKit.shutdownActorSystem(system);
99 public void testResolveShardForPathNonNullCookie() {
100 doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty());
101 final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
102 assertEquals(0L, (long) cookie);
106 public void testResolveShardForPathNullCookie() {
107 doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty());
108 final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
109 assertEquals(1L, (long) cookie);
113 public void testGetBackendInfo() throws Exception {
114 final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
115 contextProbe.expectMsgClass(ConnectClientRequest.class);
116 final TestProbe backendProbe = new TestProbe(system, "backend");
117 final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
118 List.of(), dataTree, 3);
119 contextProbe.reply(msg);
120 final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
121 final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
122 assertEquals(0L, shardBackendInfo.getCookie().longValue());
123 assertEquals(dataTree, shardBackendInfo.getDataTree().orElseThrow());
124 assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
128 public void testGetBackendInfoFail() throws Exception {
129 final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
130 final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
131 final RuntimeException cause = new RuntimeException();
132 final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
133 contextProbe.reply(response);
134 final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
135 final ExecutionException caught =
136 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
137 ExecutionException.class);
138 assertEquals(cause, caught.getCause());
142 public void testRefreshBackendInfo() throws Exception {
143 final CompletionStage<ShardBackendInfo> backendInfo = moduleShardBackendResolver.getBackendInfo(0L);
144 //handle first connect
145 contextProbe.expectMsgClass(ConnectClientRequest.class);
146 final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
147 final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
148 List.of(), dataTree, 3);
149 contextProbe.reply(msg);
151 final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
153 final CompletionStage<ShardBackendInfo> refreshed =
154 moduleShardBackendResolver.refreshBackendInfo(0L, staleBackendInfo);
155 //stale backend info should be removed and new connect request issued to the context
156 contextProbe.expectMsgClass(ConnectClientRequest.class);
157 final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
158 final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
159 List.of(), dataTree, 3);
160 contextProbe.reply(msg2);
161 final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
162 assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
163 assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
166 @SuppressWarnings("unchecked")
168 public void testNotifyWhenBackendInfoIsStale() {
169 final RegisterForShardAvailabilityChanges regMessage =
170 shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
171 Registration mockReg = mock(Registration.class);
172 shardManagerProbe.reply(new Status.Success(mockReg));
174 Consumer<Long> mockCallback = mock(Consumer.class);
175 final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
177 regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
178 verify(mockCallback, timeout(5000)).accept((long) 0);
183 regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
184 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
185 verifyNoMoreInteractions(mockCallback);
188 private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
189 final ActorUtils mock = mock(ActorUtils.class);
190 final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
191 final ActorSelection selection = system.actorSelection(actor.path());
192 final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
193 promise.success(shardInfo);
194 doReturn(promise.future()).when(mock).findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD);