*/
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private static final Object SHUTDOWN = new Object() {
- @Override
- public String toString() {
- return "SHUTDOWN";
- }
- };
private long nextHistoryId;
// FIXME: Add state flushing here once we have state
}
- private void createLocalHistory(final CreateLocalHistoryCommand command) {
- final CompletableFuture<ClientLocalHistory> future = command.future();
+ private ClientActorBehavior createLocalHistory(final CompletableFuture<ClientLocalHistory> future) {
final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
// FIXME: initiate backend instantiation
future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+ return this;
+ }
+
+ private ClientActorBehavior shutdown() {
+ // FIXME: Add shutdown procedures here
+ return null;
}
@Override
protected ClientActorBehavior onCommand(final Object command) {
- if (command instanceof CreateLocalHistoryCommand) {
- createLocalHistory((CreateLocalHistoryCommand) command);
- } else if (command instanceof GetClientRequest) {
+ if (command instanceof GetClientRequest) {
((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
- } else if (SHUTDOWN.equals(command)) {
- // FIXME: Add shutdown procedures here
- return null;
} else {
LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
}
@Override
public CompletionStage<ClientLocalHistory> createLocalHistory() {
- final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
- self().tell(command, ActorRef.noSender());
- return command.future();
+ final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
+ context().executeInActor(() -> createLocalHistory(future));
+ return future;
}
@Override
public void close() {
- self().tell(SHUTDOWN, ActorRef.noSender());
+ context().executeInActor(this::shutdown);
}
}