package org.opendaylight.controller.cluster.databroker.actors.dds;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Status;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Promise;
private ActorSystem system;
private ModuleShardBackendResolver moduleShardBackendResolver;
private TestProbe contextProbe;
+ private TestProbe shardManagerProbe;
@Mock
private ShardStrategyFactory shardStrategyFactory;
MockitoAnnotations.initMocks(this);
system = ActorSystem.apply();
contextProbe = new TestProbe(system, "context");
+
+ shardManagerProbe = new TestProbe(system, "ShardManager");
+
final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
+ when(actorContext.getShardManager()).thenReturn(shardManagerProbe.ref());
+
moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
@Test
public void testResolveShardForPathNonNullCookie() {
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
+ when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
Assert.assertEquals(0L, cookie.longValue());
}
final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
- Assert.assertEquals("default", shardBackendInfo.getShardName());
+ Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getShardName());
}
@Test
Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNotifyWhenBackendInfoIsStale() {
+ final RegisterForShardAvailabilityChanges regMessage =
+ shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
+ Registration mockReg = mock(Registration.class);
+ shardManagerProbe.reply(new Status.Success(mockReg));
+
+ Consumer<Long> mockCallback = mock(Consumer.class);
+ final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ verify(mockCallback, timeout(5000)).accept(Long.valueOf(0));
+
+ reset(mockCallback);
+ callbackReg.close();
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+ }
+
private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
final ActorContext mock = mock(ActorContext.class);
final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
- when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
+ when(mock.findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD)).thenReturn(promise.future());
+ when(mock.getClientDispatcher()).thenReturn(system.dispatchers().defaultGlobalDispatcher());
return mock;
}
}