d1c322f86aad61c9642945cd7a3d78987828b80f
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolverTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.when;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.testkit.JavaTestKit;
17 import akka.testkit.TestProbe;
18 import java.util.Collections;
19 import java.util.concurrent.CompletionStage;
20 import java.util.concurrent.ExecutionException;
21 import org.junit.After;
22 import org.junit.Assert;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.mockito.Mock;
26 import org.mockito.MockitoAnnotations;
27 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
28 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
29 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
30 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
31 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
33 import org.opendaylight.controller.cluster.access.concepts.MemberName;
34 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
35 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
37 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
42 import scala.concurrent.Promise;
43
44 public class ModuleShardBackendResolverTest {
45
46     private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
47     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
48     private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
49     private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
50
51     private ActorSystem system;
52     private ModuleShardBackendResolver moduleShardBackendResolver;
53     private TestProbe contextProbe;
54
55     @Mock
56     private ShardStrategyFactory shardStrategyFactory;
57     @Mock
58     private ShardStrategy shardStrategy;
59     @Mock
60     private DataTree dataTree;
61
62     @Before
63     public void setUp() throws Exception {
64         MockitoAnnotations.initMocks(this);
65         system = ActorSystem.apply();
66         contextProbe = new TestProbe(system, "context");
67         final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
68         moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
69         when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
70         when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
71         final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
72         when(actorContext.getPrimaryShardInfoCache()).thenReturn(cache);
73     }
74
75     @After
76     public void tearDown() throws Exception {
77         JavaTestKit.shutdownActorSystem(system);
78     }
79
80     @Test
81     public void testResolveShardForPathNonNullCookie() throws Exception {
82         when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
83         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
84         Assert.assertEquals(0L, cookie.longValue());
85     }
86
87     @Test
88     public void testResolveShardForPathNullCookie() throws Exception {
89         when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo");
90         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
91         Assert.assertEquals(1L, cookie.longValue());
92     }
93
94     @Test
95     public void testGetBackendInfo() throws Exception {
96         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
97         contextProbe.expectMsgClass(ConnectClientRequest.class);
98         final TestProbe backendProbe = new TestProbe(system, "backend");
99         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
100                 Collections.emptyList(), dataTree, 3);
101         contextProbe.reply(msg);
102         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
103         final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
104         Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
105         Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
106         Assert.assertEquals("default", shardBackendInfo.getShardName());
107     }
108
109     @Test
110     public void testGetBackendInfoFail() throws Exception {
111         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
112         final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
113         final RuntimeException cause = new RuntimeException();
114         final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
115         contextProbe.reply(response);
116         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
117         final ExecutionException caught =
118                 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
119                         ExecutionException.class);
120         Assert.assertNotNull(caught.getCause());
121         Assert.assertEquals(cause, caught.getCause().getCause());
122     }
123
124     @Test
125     public void testRefreshBackendInfo() throws Exception {
126         final CompletionStage<ShardBackendInfo> backendInfo = moduleShardBackendResolver.getBackendInfo(0L);
127         //handle first connect
128         contextProbe.expectMsgClass(ConnectClientRequest.class);
129         final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
130         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
131                 Collections.emptyList(), dataTree, 3);
132         contextProbe.reply(msg);
133         //get backend info
134         final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
135         //refresh
136         final CompletionStage<ShardBackendInfo> refreshed =
137                 moduleShardBackendResolver.refreshBackendInfo(0L, staleBackendInfo);
138         //stale backend info should be removed and new connect request issued to the context
139         contextProbe.expectMsgClass(ConnectClientRequest.class);
140         final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
141         final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
142                 Collections.emptyList(), dataTree, 3);
143         contextProbe.reply(msg2);
144         final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
145         Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
146         Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
147     }
148
149     private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
150         final ActorContext mock = mock(ActorContext.class);
151         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
152         final ActorSelection selection = system.actorSelection(actor.path());
153         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
154         promise.success(shardInfo);
155         when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
156         return mock;
157     }
158 }