*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import java.util.function.Function;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
- * frontend.
- *
- * This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
- *
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
- * When touching internal state, be mindful of the execution context from which execution context, Actor
- * or POJO, is the state being accessed or modified.
- *
- * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
- * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
- * doubt, feel free to synchronize on this object.
- *
- * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
- * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
- * for correctness and performance impact.
- *
- * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
- * for performing work and charging applications for it. That has two positive effects:
- * - CPU usage is distributed across applications, minimizing work done in the actor thread
- * - CPU usage provides back-pressure towards the application.
+ * {@link AbstractDataStoreClientBehavior} which performs module-based sharding.
*
* @author Robert Varga
*/
-final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-
- private final ModuleShardBackendResolver resolver;
- private long nextHistoryId;
-
- DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
- super(context);
- resolver = new ModuleShardBackendResolver(actorContext);
- }
-
- //
- //
- // Methods below are invoked from the client actor thread
- //
- //
-
- @Override
- protected void haltClient(final Throwable cause) {
- // FIXME: Add state flushing here once we have state
- }
+final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
+ private final Function<YangInstanceIdentifier, Long> pathToShard;
- private ClientActorBehavior createLocalHistory(final CompletableFuture<ClientLocalHistory> future) {
- final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
- LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
-
- // FIXME: initiate backend instantiation
- future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
- return this;
- }
-
- private ClientActorBehavior shutdown() {
- // FIXME: Add shutdown procedures here
- return null;
- }
-
- @Override
- protected ClientActorBehavior onCommand(final Object command) {
- if (command instanceof GetClientRequest) {
- ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
- } else {
- LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
- }
-
- return this;
+ private DistributedDataStoreClientBehavior(final ClientActorContext context,
+ final ModuleShardBackendResolver resolver) {
+ super(context, resolver);
+ pathToShard = resolver::resolveShardForPath;
}
- //
- //
- // Methods below are invoked from application threads
- //
- //
-
- @Override
- public CompletionStage<ClientLocalHistory> createLocalHistory() {
- final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
- context().executeInActor(() -> createLocalHistory(future));
- return future;
- }
-
- @Override
- public void close() {
- context().executeInActor(this::shutdown);
+ DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
+ this(context, new ModuleShardBackendResolver(context.getIdentifier(), actorContext));
}
@Override
- protected ModuleShardBackendResolver resolver() {
- return resolver;
+ Long resolveShardForPath(final YangInstanceIdentifier path) {
+ return pathToShard.apply(path);
}
}