+
+ private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(EntityOwnershipShard.ENTITY_OWNERS_PATH).
+ checkedGet(5, TimeUnit.SECONDS);
+ if(optional.isPresent()) {
+ return optional.get();
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
+ private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
+ EntityOwnershipCandidate candidate) {
+ RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
+ assertSame("getCandidate", candidate, regCandidate.getCandidate());
+ assertEquals("getEntity", entity, regCandidate.getEntity());
+ }
+
+ private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
+ assertNotNull("EntityOwnershipCandidateRegistration null", reg);
+ assertEquals("getEntity", entity, reg.getEntity());
+ }
+
+ static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+ TestShardPropsCreator() {
+ super("member-1");
+ }
+
+ private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
+ private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
+ private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
+
+ @Override
+ public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
+ schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> T waitForShardMessage() {
+ assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
+ Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
+ assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
+ return (T) receivedMessage.get();
+ }
+
+ void expectShardMessage(Class<?> ofType) {
+ messageReceived.set(new CountDownLatch(1));
+ receivedMessage.set(null);
+ messageClass.set(ofType);
+ }
+ }
+
+ static class TestEntityOwnershipShard extends EntityOwnershipShard {
+ private final AtomicReference<CountDownLatch> messageReceived;
+ private final AtomicReference<Object> receivedMessage;
+ private final AtomicReference<Class<?>> messageClass;
+
+ protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
+ AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
+ AtomicReference<Object> receivedMessage) {
+ super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
+ this.messageClass = messageClass;
+ this.messageReceived = messageReceived;
+ this.receivedMessage = receivedMessage;
+ }
+
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ try {
+ super.onReceiveCommand(message);
+ } finally {
+ Class<?> expMsgClass = messageClass.get();
+ if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
+ receivedMessage.set(message);
+ messageReceived.get().countDown();
+ }
+ }
+ }
+ }