2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertSame;
12 import static org.junit.Assert.assertThrows;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Status;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import java.util.List;
25 import java.util.Optional;
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
30 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
31 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
32 import org.opendaylight.controller.cluster.access.client.InternalCommand;
33 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
34 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeModification;
40 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
41 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
42 import scala.concurrent.Promise;
44 public abstract class AbstractDataStoreClientBehaviorTest {
46 protected static final String SHARD = "default";
47 private static final String PERSISTENCE_ID = "per-1";
49 private ActorSystem system;
50 private ClientActorContext clientContext;
51 private TestProbe clientActorProbe;
52 private TestProbe actorContextProbe;
53 private AbstractDataStoreClientBehavior behavior;
54 private ActorUtils util;
58 system = ActorSystem.apply();
59 clientActorProbe = new TestProbe(system, "client");
60 actorContextProbe = new TestProbe(system, "actor-context");
61 util = createActorContextMock(system, actorContextProbe.ref());
63 AccessClientUtil.createClientActorContext(system, clientActorProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
64 behavior = createBehavior(clientContext, util);
67 @SuppressWarnings("checkstyle:hiddenField")
68 protected abstract AbstractDataStoreClientBehavior createBehavior(ClientActorContext clientContext,
72 public void tearDown() {
73 TestKit.shutdownActorSystem(system);
77 public void testResolveShardForPath() {
78 assertEquals(0L, behavior.resolveShardForPath(YangInstanceIdentifier.of()).longValue());
82 public void testHaltClient() {
83 behavior.haltClient(new RuntimeException());
87 public void testOnCommand() {
88 final TestProbe probe = new TestProbe(system);
89 final GetClientRequest request = new GetClientRequest(probe.ref());
90 final AbstractDataStoreClientBehavior nextBehavior = behavior.onCommand(request);
91 final Status.Success success = probe.expectMsgClass(Status.Success.class);
92 assertEquals(behavior, success.status());
93 assertSame(behavior, nextBehavior);
97 public void testOnCommandUnhandled() {
98 final AbstractDataStoreClientBehavior nextBehavior = behavior.onCommand("unhandled");
99 assertSame(behavior, nextBehavior);
103 public void testCreateLocalHistory() {
104 final ClientLocalHistory history = behavior.createLocalHistory();
105 assertEquals(behavior.getIdentifier(), history.getIdentifier().getClientId());
109 public void testCreateTransaction() {
110 final ClientTransaction transaction = behavior.createTransaction();
111 assertEquals(behavior.getIdentifier(), transaction.getIdentifier().getHistoryId().getClientId());
115 public void testCreateSnapshot() {
116 final ClientSnapshot snapshot = behavior.createSnapshot();
117 assertEquals(behavior.getIdentifier(), snapshot.getIdentifier().getHistoryId().getClientId());
121 public void testClose() {
123 final InternalCommand<ShardBackendInfo> internalCommand =
124 clientActorProbe.expectMsgClass(InternalCommand.class);
125 internalCommand.execute(behavior);
127 assertThrows(IllegalStateException.class, () -> behavior.createLocalHistory());
131 public void testGetIdentifier() {
132 assertEquals(CLIENT_ID, behavior.getIdentifier());
136 public void testGetConnection() {
137 final var datastoreContext = mock(DatastoreContext.class);
138 doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
139 doReturn(datastoreContext).when(util).getDatastoreContext();
141 //set up data tree mock
142 final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
143 doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.of());
144 final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
145 doReturn(modification).when(snapshot).newModification();
146 final DataTree dataTree = mock(DataTree.class);
147 doReturn(snapshot).when(dataTree).takeSnapshot();
149 final TestProbe backendProbe = new TestProbe(system, "backend");
150 final long shard = 0L;
152 behavior.createTransaction().read(YangInstanceIdentifier.of());
153 final AbstractClientConnection<ShardBackendInfo> connection = behavior.getConnection(shard);
154 //check cached connection for same shard
155 assertSame(connection, behavior.getConnection(shard));
157 final ConnectClientRequest connectClientRequest = actorContextProbe.expectMsgClass(ConnectClientRequest.class);
158 assertEquals(CLIENT_ID, connectClientRequest.getTarget());
159 final long sequence = 0L;
160 assertEquals(sequence, connectClientRequest.getSequence());
161 actorContextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(), List.of(), dataTree,
163 assertEquals(clientActorProbe.ref(), connection.localActor());
164 //capture and execute command passed to client context
165 final InternalCommand<ShardBackendInfo> command = clientActorProbe.expectMsgClass(InternalCommand.class);
166 command.execute(behavior);
167 //check, whether command was reaplayed
168 verify(modification).readNode(YangInstanceIdentifier.of());
171 private static ActorUtils createActorContextMock(final ActorSystem system, final ActorRef actor) {
172 final ActorUtils mock = mock(ActorUtils.class);
173 final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
174 final ActorSelection selection = system.actorSelection(actor.path());
175 final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
176 promise.success(shardInfo);
177 doReturn(promise.future()).when(mock).findPrimaryShardAsync(SHARD);