7187f83a1ac060d41341c110181877b9535b2985
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.StampedLock;
20 import java.util.stream.Stream;
21 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
22 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
23 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
25 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
26 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
27 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
34  * frontend.
35  *
36  * <p>
37  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
38  * Java world outside of actor containment are captured in {@link DataStoreClient}.
39  *
40  * <p>
41  * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
42  *            When touching internal state, be mindful of the execution context from which execution context, Actor
43  *            or POJO, is the state being accessed or modified.
44  *
45  * <p>
46  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
47  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
48  *                doubt, feel free to synchronize on this object.
49  *
50  * <p>
51  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
52  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
53  *              for correctness and performance impact.
54  *
55  * <p>
56  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
57  *             for performing work and charging applications for it. That has two positive effects:
58  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
59  *             - CPU usage provides back-pressure towards the application.
60  *
61  * @author Robert Varga
62  */
63 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
64         implements DataStoreClient {
65     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
66
67     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
68     private final AtomicLong nextHistoryId = new AtomicLong(1);
69     private final StampedLock lock = new StampedLock();
70     private final SingleClientHistory singleHistory;
71
72     private volatile Throwable aborted;
73
74     AbstractDataStoreClientBehavior(final ClientActorContext context,
75             final AbstractShardBackendResolver resolver) {
76         super(context, resolver);
77         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
78     }
79
80     //
81     //
82     // Methods below are invoked from the client actor thread
83     //
84     //
85
86     @Override
87     protected final void haltClient(final Throwable cause) {
88         // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
89         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
90         // thread.
91         if (aborted != null) {
92             abortOperations(cause);
93         }
94     }
95
96     private void abortOperations(final Throwable cause) {
97         final long stamp = lock.writeLock();
98         try {
99             // This acts as a barrier, application threads check this after they have added an entry in the maps,
100             // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
101             aborted = cause;
102
103             for (ClientLocalHistory h : histories.values()) {
104                 h.localAbort(cause);
105             }
106             histories.clear();
107         } finally {
108             lock.unlockWrite(stamp);
109         }
110     }
111
112     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
113         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
114         return null;
115     }
116
117     @Override
118     protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
119         if (command instanceof GetClientRequest) {
120             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
121         } else {
122             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
123         }
124
125         return this;
126     }
127
128     /*
129      * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
130      * involved, as the messages need to be replayed to the individual proxies.
131      */
132     @Override
133     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
134         final long stamp = lock.writeLock();
135
136         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
137         //         further TransactionProxies can be created and we can safely traverse maps without risking
138         //         missing an entry
139         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
140         startReconnect(singleHistory, newConn, cohorts);
141         for (ClientLocalHistory h : histories.values()) {
142             startReconnect(h, newConn, cohorts);
143         }
144
145         return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
146     }
147
148     private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
149             final long stamp, final Collection<HistoryReconnectCohort> cohorts,
150             final Collection<ConnectionEntry> previousEntries) {
151         try {
152             // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
153             //         the non-throttling interface to the connection, hence we use a wrapper consumer
154             for (HistoryReconnectCohort c : cohorts) {
155                 c.replayRequests(previousEntries);
156             }
157
158             // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
159             //         requests will be immediately sent to it and requests being sent concurrently will get
160             //         forwarded once they hit the new connection.
161             return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
162         } finally {
163             try {
164                 // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
165                 for (HistoryReconnectCohort c : cohorts) {
166                     c.close();
167                 }
168             } finally {
169                 lock.unlockWrite(stamp);
170             }
171         }
172     }
173
174     private static void startReconnect(final AbstractClientHistory history,
175             final ConnectedClientConnection<ShardBackendInfo> newConn,
176             final Collection<HistoryReconnectCohort> cohorts) {
177         final HistoryReconnectCohort cohort = history.startReconnect(newConn);
178         if (cohort != null) {
179             cohorts.add(cohort);
180         }
181     }
182
183     //
184     //
185     // Methods below are invoked from application threads
186     //
187     //
188
189     @Override
190     public final ClientLocalHistory createLocalHistory() {
191         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
192             nextHistoryId.getAndIncrement());
193
194         final long stamp = lock.readLock();
195         try {
196             if (aborted != null) {
197                 Throwables.throwIfUnchecked(aborted);
198                 throw new RuntimeException(aborted);
199             }
200
201             final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
202             LOG.debug("{}: creating a new local history {}", persistenceId(), history);
203
204             Verify.verify(histories.put(historyId, history) == null);
205             return history;
206         } finally {
207             lock.unlockRead(stamp);
208         }
209     }
210
211     @Override
212     public final ClientTransaction createTransaction() {
213         return singleHistory.createTransaction();
214     }
215
216     @Override
217     public final ClientSnapshot createSnapshot() {
218         return singleHistory.takeSnapshot();
219     }
220
221     @Override
222     public void close() {
223         super.close();
224         context().executeInActor(this::shutdown);
225     }
226
227     abstract Long resolveShardForPath(YangInstanceIdentifier path);
228
229     abstract Stream<Long> resolveAllShards();
230
231     final ActorUtils actorUtils() {
232         return ((AbstractShardBackendResolver) resolver()).actorUtils();
233     }
234 }