2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.StampedLock;
20 import java.util.stream.Stream;
21 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
22 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
23 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
25 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
26 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
27 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
37 * This class is not visible outside of this package because it breaks the actor containment. Services provided to
38 * Java world outside of actor containment are captured in {@link DataStoreClient}.
41 * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
42 * When touching internal state, be mindful of the execution context from which execution context, Actor
43 * or POJO, is the state being accessed or modified.
46 * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
47 * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
48 * doubt, feel free to synchronize on this object.
51 * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
52 * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
53 * for correctness and performance impact.
56 * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
57 * for performing work and charging applications for it. That has two positive effects:
58 * - CPU usage is distributed across applications, minimizing work done in the actor thread
59 * - CPU usage provides back-pressure towards the application.
61 * @author Robert Varga
63 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
64 implements DataStoreClient {
65 private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
67 private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
68 private final AtomicLong nextHistoryId = new AtomicLong(1);
69 private final StampedLock lock = new StampedLock();
70 private final SingleClientHistory singleHistory;
72 private volatile Throwable aborted;
74 AbstractDataStoreClientBehavior(final ClientActorContext context,
75 final AbstractShardBackendResolver resolver) {
76 super(context, resolver);
77 singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
82 // Methods below are invoked from the client actor thread
87 protected final void haltClient(final Throwable cause) {
88 // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
89 // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
91 if (aborted != null) {
92 abortOperations(cause);
96 private void abortOperations(final Throwable cause) {
97 final long stamp = lock.writeLock();
99 // This acts as a barrier, application threads check this after they have added an entry in the maps,
100 // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
103 for (ClientLocalHistory h : histories.values()) {
108 lock.unlockWrite(stamp);
112 private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
113 abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
118 protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
119 if (command instanceof GetClientRequest) {
120 ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
122 LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
129 * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
130 * involved, as the messages need to be replayed to the individual proxies.
133 protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
134 final long stamp = lock.writeLock();
136 // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
137 // further TransactionProxies can be created and we can safely traverse maps without risking
139 final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
140 startReconnect(singleHistory, newConn, cohorts);
141 for (ClientLocalHistory h : histories.values()) {
142 startReconnect(h, newConn, cohorts);
145 return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
148 private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
149 final long stamp, final Collection<HistoryReconnectCohort> cohorts,
150 final Collection<ConnectionEntry> previousEntries) {
152 // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
153 // the non-throttling interface to the connection, hence we use a wrapper consumer
154 for (HistoryReconnectCohort c : cohorts) {
155 c.replayRequests(previousEntries);
158 // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
159 // requests will be immediately sent to it and requests being sent concurrently will get
160 // forwarded once they hit the new connection.
161 return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
164 // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
165 for (HistoryReconnectCohort c : cohorts) {
169 lock.unlockWrite(stamp);
174 private static void startReconnect(final AbstractClientHistory history,
175 final ConnectedClientConnection<ShardBackendInfo> newConn,
176 final Collection<HistoryReconnectCohort> cohorts) {
177 final HistoryReconnectCohort cohort = history.startReconnect(newConn);
178 if (cohort != null) {
185 // Methods below are invoked from application threads
190 public final ClientLocalHistory createLocalHistory() {
191 final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
192 nextHistoryId.getAndIncrement());
194 final long stamp = lock.readLock();
196 if (aborted != null) {
197 Throwables.throwIfUnchecked(aborted);
198 throw new IllegalStateException(aborted);
201 final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
202 LOG.debug("{}: creating a new local history {}", persistenceId(), history);
204 Verify.verify(histories.put(historyId, history) == null);
207 lock.unlockRead(stamp);
212 public final ClientTransaction createTransaction() {
213 return singleHistory.createTransaction();
217 public final ClientSnapshot createSnapshot() {
218 return singleHistory.takeSnapshot();
222 public void close() {
224 context().executeInActor(this::shutdown);
227 abstract Long resolveShardForPath(YangInstanceIdentifier path);
229 abstract Stream<Long> resolveAllShards();
231 final ActorUtils actorUtils() {
232 return ((AbstractShardBackendResolver) resolver()).actorUtils();