import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private static final Object SHUTDOWN = new Object() {
- @Override
- public String toString() {
- return "SHUTDOWN";
- }
- };
+ private final ModuleShardBackendResolver resolver;
private long nextHistoryId;
- DistributedDataStoreClientBehavior(final ClientActorContext context) {
+ DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
+ resolver = new ModuleShardBackendResolver(actorContext);
}
//
//
//
- private void createLocalHistory(final CreateLocalHistoryCommand command) {
- final CompletableFuture<ClientLocalHistory> future = command.future();
+ @Override
+ protected void haltClient(final Throwable cause) {
+ // FIXME: Add state flushing here once we have state
+ }
+
+ private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
+ final CompletableFuture<ClientLocalHistory> future) {
final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
// FIXME: initiate backend instantiation
future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+ return currentBehavior;
+ }
+
+ private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
+ // FIXME: Add shutdown procedures here
+ return null;
}
@Override
protected ClientActorBehavior onCommand(final Object command) {
- if (command instanceof CreateLocalHistoryCommand) {
- createLocalHistory((CreateLocalHistoryCommand) command);
- } else if (command instanceof GetClientRequest) {
+ if (command instanceof GetClientRequest) {
((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
- } else if (SHUTDOWN.equals(command)) {
- // Add shutdown procedures here
- return null;
} else {
LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
}
@Override
public CompletionStage<ClientLocalHistory> createLocalHistory() {
- final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
- self().tell(command, ActorRef.noSender());
- return command.future();
+ final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
+ context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
+ return future;
}
@Override
public void close() {
- self().tell(SHUTDOWN, ActorRef.noSender());
+ context().executeInActor(this::shutdown);
+ }
+
+ @Override
+ protected ModuleShardBackendResolver resolver() {
+ return resolver;
}
}