2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.reset;
12 import static org.mockito.Mockito.timeout;
13 import static org.mockito.Mockito.verify;
14 import static org.mockito.Mockito.verifyNoMoreInteractions;
15 import static org.mockito.Mockito.when;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Status;
21 import akka.testkit.TestProbe;
22 import akka.testkit.javadsl.TestKit;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.Collections;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
36 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
37 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
38 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
39 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
40 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
41 import org.opendaylight.controller.cluster.access.concepts.MemberName;
42 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
44 import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
45 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
49 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
53 import scala.concurrent.Promise;
55 public class ModuleShardBackendResolverTest {
57 private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
58 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
59 private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
60 private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
62 private ActorSystem system;
63 private ModuleShardBackendResolver moduleShardBackendResolver;
64 private TestProbe contextProbe;
65 private TestProbe shardManagerProbe;
68 private ShardStrategyFactory shardStrategyFactory;
70 private ShardStrategy shardStrategy;
72 private DataTree dataTree;
76 MockitoAnnotations.initMocks(this);
77 system = ActorSystem.apply();
78 contextProbe = new TestProbe(system, "context");
80 shardManagerProbe = new TestProbe(system, "ShardManager");
82 final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
83 when(actorUtils.getShardManager()).thenReturn(shardManagerProbe.ref());
85 moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
86 when(actorUtils.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
87 when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.empty())).thenReturn(shardStrategy);
88 final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
89 when(actorUtils.getPrimaryShardInfoCache()).thenReturn(cache);
93 public void tearDown() {
94 TestKit.shutdownActorSystem(system);
98 public void testResolveShardForPathNonNullCookie() {
99 when(shardStrategy.findShard(YangInstanceIdentifier.empty())).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
100 final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
101 Assert.assertEquals(0L, cookie.longValue());
105 public void testResolveShardForPathNullCookie() {
106 when(shardStrategy.findShard(YangInstanceIdentifier.empty())).thenReturn("foo");
107 final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
108 Assert.assertEquals(1L, cookie.longValue());
112 public void testGetBackendInfo() throws Exception {
113 final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
114 contextProbe.expectMsgClass(ConnectClientRequest.class);
115 final TestProbe backendProbe = new TestProbe(system, "backend");
116 final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
117 Collections.emptyList(), dataTree, 3);
118 contextProbe.reply(msg);
119 final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
120 final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
121 Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
122 Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
123 Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
127 public void testGetBackendInfoFail() throws Exception {
128 final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
129 final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
130 final RuntimeException cause = new RuntimeException();
131 final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
132 contextProbe.reply(response);
133 final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
134 final ExecutionException caught =
135 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
136 ExecutionException.class);
137 Assert.assertEquals(cause, caught.getCause());
141 public void testRefreshBackendInfo() throws Exception {
142 final CompletionStage<ShardBackendInfo> backendInfo = moduleShardBackendResolver.getBackendInfo(0L);
143 //handle first connect
144 contextProbe.expectMsgClass(ConnectClientRequest.class);
145 final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
146 final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
147 Collections.emptyList(), dataTree, 3);
148 contextProbe.reply(msg);
150 final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
152 final CompletionStage<ShardBackendInfo> refreshed =
153 moduleShardBackendResolver.refreshBackendInfo(0L, staleBackendInfo);
154 //stale backend info should be removed and new connect request issued to the context
155 contextProbe.expectMsgClass(ConnectClientRequest.class);
156 final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
157 final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
158 Collections.emptyList(), dataTree, 3);
159 contextProbe.reply(msg2);
160 final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
161 Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
162 Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
165 @SuppressWarnings("unchecked")
167 public void testNotifyWhenBackendInfoIsStale() {
168 final RegisterForShardAvailabilityChanges regMessage =
169 shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
170 Registration mockReg = mock(Registration.class);
171 shardManagerProbe.reply(new Status.Success(mockReg));
173 Consumer<Long> mockCallback = mock(Consumer.class);
174 final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
176 regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
177 verify(mockCallback, timeout(5000)).accept(Long.valueOf(0));
182 regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
183 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
184 verifyNoMoreInteractions(mockCallback);
187 private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
188 final ActorUtils mock = mock(ActorUtils.class);
189 final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
190 final ActorSelection selection = system.actorSelection(actor.path());
191 final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
192 promise.success(shardInfo);
193 when(mock.findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD)).thenReturn(promise.future());
194 when(mock.getClientDispatcher()).thenReturn(system.dispatchers().defaultGlobalDispatcher());