2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.function.Consumer;
18 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
19 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
20 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.Response;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
32 * This class is not visible outside of this package because it breaks the actor containment. Services provided to
33 * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
35 * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
36 * When touching internal state, be mindful of the execution context from which execution context, Actor
37 * or POJO, is the state being accessed or modified.
39 * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
40 * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
41 * doubt, feel free to synchronize on this object.
43 * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
44 * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
45 * for correctness and performance impact.
47 * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
48 * for performing work and charging applications for it. That has two positive effects:
49 * - CPU usage is distributed across applications, minimizing work done in the actor thread
50 * - CPU usage provides back-pressure towards the application.
52 * @author Robert Varga
54 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
55 private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
57 private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
58 private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
59 private final AtomicLong nextHistoryId = new AtomicLong(1);
60 private final AtomicLong nextTransactionId = new AtomicLong();
61 private final ModuleShardBackendResolver resolver;
62 private final SingleClientHistory singleHistory;
64 private volatile Throwable aborted;
66 DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
68 resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
69 singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
74 // Methods below are invoked from the client actor thread
79 protected void haltClient(final Throwable cause) {
80 // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
81 // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
83 if (aborted != null) {
84 abortOperations(cause);
88 private void abortOperations(final Throwable cause) {
89 // This acts as a barrier, application threads check this after they have added an entry in the maps,
90 // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
93 for (ClientLocalHistory h : histories.values()) {
98 for (ClientTransaction t : transactions.values()) {
101 transactions.clear();
104 private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
105 abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
110 protected DistributedDataStoreClientBehavior onCommand(final Object command) {
111 if (command instanceof GetClientRequest) {
112 ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
114 LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
122 // Methods below are invoked from application threads
126 private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
127 final Throwable aborted) {
128 Verify.verify(map.put(key, value) == null);
130 if (aborted != null) {
132 value.localAbort(aborted);
133 } catch (Exception e) {
134 LOG.debug("Close of {} failed", value, e);
136 map.remove(key, value);
137 throw Throwables.propagate(aborted);
144 public ClientLocalHistory createLocalHistory() {
145 final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
146 nextHistoryId.getAndIncrement());
147 final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
148 LOG.debug("{}: creating a new local history {}", persistenceId(), history);
150 return returnIfOperational(histories, historyId, history, aborted);
154 public ClientTransaction createTransaction() {
155 final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
156 nextTransactionId.getAndIncrement());
157 final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
158 LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
160 return returnIfOperational(transactions, txId, tx, aborted);
164 public void close() {
165 context().executeInActor(this::shutdown);
169 protected ModuleShardBackendResolver resolver() {
173 void transactionComplete(final ClientTransaction transaction) {
174 transactions.remove(transaction.getIdentifier());
177 void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
178 sendRequest(request, response -> {
179 completer.accept(response);