-
- static class TestShardBuilder extends EntityOwnershipShard.Builder {
- TestShardBuilder() {
- localMemberName("member-1").ownerSelectionStrategyConfig(
- EntityOwnerSelectionStrategyConfig.newBuilder().build());
- }
-
- private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
- private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
- private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
- private final AtomicReference<DataTree> dataTree = new AtomicReference<>();
-
- @Override
- public Props props() {
- verify();
- return Props.create(TestEntityOwnershipShard.class,this, messageClass, messageReceived,
- receivedMessage, dataTree);
- }
-
- @SuppressWarnings("unchecked")
- <T> T waitForShardMessage() {
- assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
- Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
- assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
- return (T) receivedMessage.get();
- }
-
- void expectShardMessage(Class<?> ofType) {
- messageReceived.set(new CountDownLatch(1));
- receivedMessage.set(null);
- messageClass.set(ofType);
- }
-
- void setDataTree(DataTree tree) {
- this.dataTree.set(tree);
- }
- }
-
- static class TestEntityOwnershipShard extends EntityOwnershipShard {
- private final AtomicReference<CountDownLatch> messageReceived;
- private final AtomicReference<Object> receivedMessage;
- private final AtomicReference<Class<?>> messageClass;
- private final AtomicReference<DataTree> dataTree;
-
- protected TestEntityOwnershipShard(EntityOwnershipShard.Builder builder,
- AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
- AtomicReference<Object> receivedMessage, AtomicReference<DataTree> dataTree) {
- super(builder);
- this.messageClass = messageClass;
- this.messageReceived = messageReceived;
- this.receivedMessage = receivedMessage;
- this.dataTree = dataTree;
- }
-
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
- try {
- if(dataTree.get() != null && message instanceof GetShardDataTree) {
- sender().tell(dataTree.get(), self());
- } else {
- super.onReceiveCommand(message);
- }
- } finally {
- Class<?> expMsgClass = messageClass.get();
- if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
- receivedMessage.set(message);
- messageReceived.get().countDown();
- }
- }
- }
- }