- return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
- }
-
- private ActorSelection findPrimary() {
- Object result = getResult(shardManager, new FindPrimary(Shard.DEFAULT_NAME), ASK_DURATION);
-
- if(result instanceof PrimaryFound){
- PrimaryFound found = (PrimaryFound) result;
- LOG.error("Primary found {}", found.getPrimaryPath());
-
- return actorSystem.actorSelection(found.getPrimaryPath());
- }
- throw new RuntimeException("primary was not found");
- }
-
- private Object getResult(ActorRef actor, Object message, FiniteDuration duration){
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
-
- try {
- return Await.result(future, AWAIT_DURATION);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);