4f91cb27fae151a26ba5c49dbd8ba12498dc8238
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.StampedLock;
20 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
21 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
22 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
23 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
25 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
26 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
33  * frontend.
34  *
35  * <p>
36  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
37  * Java world outside of actor containment are captured in {@link DataStoreClient}.
38  *
39  * <p>
40  * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
41  *            When touching internal state, be mindful of the execution context from which execution context, Actor
42  *            or POJO, is the state being accessed or modified.
43  *
44  * <p>
45  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
46  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
47  *                doubt, feel free to synchronize on this object.
48  *
49  * <p>
50  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
51  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
52  *              for correctness and performance impact.
53  *
54  * <p>
55  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
56  *             for performing work and charging applications for it. That has two positive effects:
57  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
58  *             - CPU usage provides back-pressure towards the application.
59  *
60  * @author Robert Varga
61  */
62 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
63         implements DataStoreClient {
64     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
65
66     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
67     private final AtomicLong nextHistoryId = new AtomicLong(1);
68     private final StampedLock lock = new StampedLock();
69     private final SingleClientHistory singleHistory;
70
71     private volatile Throwable aborted;
72
73     AbstractDataStoreClientBehavior(final ClientActorContext context,
74             final BackendInfoResolver<ShardBackendInfo> resolver) {
75         super(context, resolver);
76         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
77     }
78
79     //
80     //
81     // Methods below are invoked from the client actor thread
82     //
83     //
84
85     @Override
86     protected final void haltClient(final Throwable cause) {
87         // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
88         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
89         // thread.
90         if (aborted != null) {
91             abortOperations(cause);
92         }
93     }
94
95     private void abortOperations(final Throwable cause) {
96         final long stamp = lock.writeLock();
97         try {
98             // This acts as a barrier, application threads check this after they have added an entry in the maps,
99             // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
100             aborted = cause;
101
102             for (ClientLocalHistory h : histories.values()) {
103                 h.localAbort(cause);
104             }
105             histories.clear();
106         } finally {
107             lock.unlockWrite(stamp);
108         }
109     }
110
111     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
112         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
113         return null;
114     }
115
116     @Override
117     protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
118         if (command instanceof GetClientRequest) {
119             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
120         } else {
121             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
122         }
123
124         return this;
125     }
126
127     /*
128      * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
129      * involved, as the messages need to be replayed to the individual proxies.
130      */
131     @Override
132     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
133         final long stamp = lock.writeLock();
134
135         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
136         //         further TransactionProxies can be created and we can safely traverse maps without risking
137         //         missing an entry
138         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
139         startReconnect(singleHistory, newConn, cohorts);
140         for (ClientLocalHistory h : histories.values()) {
141             startReconnect(h, newConn, cohorts);
142         }
143
144         return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
145     }
146
147     private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
148             final long stamp, final Collection<HistoryReconnectCohort> cohorts,
149             final Collection<ConnectionEntry> previousEntries) {
150         try {
151             // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
152             //         the non-throttling interface to the connection, hence we use a wrapper consumer
153             for (HistoryReconnectCohort c : cohorts) {
154                 c.replayRequests(previousEntries);
155             }
156
157             // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
158             //         requests will be immediately sent to it and requests being sent concurrently will get
159             //         forwarded once they hit the new connection.
160             return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
161         } finally {
162             try {
163                 // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
164                 for (HistoryReconnectCohort c : cohorts) {
165                     c.close();
166                 }
167             } finally {
168                 lock.unlockWrite(stamp);
169             }
170         }
171     }
172
173     private static void startReconnect(final AbstractClientHistory history,
174             final ConnectedClientConnection<ShardBackendInfo> newConn,
175             final Collection<HistoryReconnectCohort> cohorts) {
176         final HistoryReconnectCohort cohort = history.startReconnect(newConn);
177         if (cohort != null) {
178             cohorts.add(cohort);
179         }
180     }
181
182     //
183     //
184     // Methods below are invoked from application threads
185     //
186     //
187
188     @Override
189     public final ClientLocalHistory createLocalHistory() {
190         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
191             nextHistoryId.getAndIncrement());
192
193         final long stamp = lock.readLock();
194         try {
195             if (aborted != null) {
196                 Throwables.throwIfUnchecked(aborted);
197                 throw new RuntimeException(aborted);
198             }
199
200             final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
201             LOG.debug("{}: creating a new local history {}", persistenceId(), history);
202
203             Verify.verify(histories.put(historyId, history) == null);
204             return history;
205         } finally {
206             lock.unlockRead(stamp);
207         }
208     }
209
210     @Override
211     public final ClientTransaction createTransaction() {
212         return singleHistory.createTransaction();
213     }
214
215     @Override
216     public final ClientSnapshot createSnapshot() {
217         return singleHistory.takeSnapshot();
218     }
219
220     @Override
221     public void close() {
222         super.close();
223         context().executeInActor(this::shutdown);
224     }
225
226     abstract Long resolveShardForPath(YangInstanceIdentifier path);
227 }