2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import java.util.concurrent.CompletableFuture;
13 import java.util.concurrent.CompletionStage;
14 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
15 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
16 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
17 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
22 * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
25 * This class is not visible outside of this package because it breaks the actor containment. Services provided to
26 * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
28 * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
29 * When touching internal state, be mindful of the execution context from which execution context, Actor
30 * or POJO, is the state being accessed or modified.
32 * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
33 * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
34 * doubt, feel free to synchronize on this object.
36 * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
37 * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
38 * for correctness and performance impact.
40 * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
41 * for performing work and charging applications for it. That has two positive effects:
42 * - CPU usage is distributed across applications, minimizing work done in the actor thread
43 * - CPU usage provides back-pressure towards the application.
45 * @author Robert Varga
47 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
48 private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
50 private final ModuleShardBackendResolver resolver;
51 private long nextHistoryId;
53 DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
55 resolver = new ModuleShardBackendResolver(actorContext);
60 // Methods below are invoked from the client actor thread
65 protected void haltClient(final Throwable cause) {
66 // FIXME: Add state flushing here once we have state
69 private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
70 final CompletableFuture<ClientLocalHistory> future) {
71 final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
72 LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
74 // FIXME: initiate backend instantiation
75 future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
76 return currentBehavior;
79 private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
80 // FIXME: Add shutdown procedures here
85 protected ClientActorBehavior onCommand(final Object command) {
86 if (command instanceof GetClientRequest) {
87 ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
89 LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
97 // Methods below are invoked from application threads
102 public CompletionStage<ClientLocalHistory> createLocalHistory() {
103 final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
104 context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
109 public void close() {
110 context().executeInActor(this::shutdown);
114 protected ModuleShardBackendResolver resolver() {