BUG-5280: add a section about actor model
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / site / asciidoc / distributed-data-store.adoc
1 = Distributed Data Store implementation
2
3 [NOTE]
4 .This is evolving document
5
6 This document is in no sense complete, it describes the intenteded implementation.
7 It is subject to revisions as feedback is received and as the implementation evolves.
8
9 == Overview
10 The implementation builds on `sal-akka-raft` and adds business logic to
11 create a strongly-consistent, replicated, `DataTree`. It does this by introducing
12 a finite state machine to interact with local and remote clients and replicates
13 this using RAFT. The FSM is built around an internal `InMemoryDataTree` instance,
14 which holds the actual data being replicated and takes care of performing
15 individual operations (bundled in transactions) on the data.
16
17 It exposes an MD-SAL `DOMDataBroker` interface, which individual applications use,
18 directly, or via the Binding Adaptor, to perform operations.
19
20 The implementation is split into two major blocks:
21
22 Backend::
23   The actual implementation of the replicated data tree. It is pivoted
24   around Shard, a subclass of `RaftActor`, and provides a set of messages for the
25   *Frontend* to submit work to the Leader, which is then replicated to Followers.
26
27 Frontend::
28   The MD-SAL layer, which is co-located with the application accessing
29   data stored in the data store. It is responsible for implementing MD-SAL
30   `DOMDataBroker` and related interfaces and translate them to messages, which drive
31   the Backend. It is also responsible with dealing with common failures occuring
32   in communication with the Backend, such as messages getting lost and Shard
33   Leader moving in the system, to the extent reasonably possible. In current
34   implementation this maps to a ConcurrentDataBroker instance.
35
36 Both Backend and Frontend share a common Actor System on the local node. Nodes are
37 connected together using Akka Clustering. There can be at most one Frontend instance
38 running in an Actor System. For each individual shard, there can be at most one
39 instance running in an Actor System.
40
41 == Concepts
42
43 Shard Name::
44   The logical name of a part of the Conceptual Data Tree. It is
45   associated with a Shard instance. Each member can run at most one such instance
46   for a particular name. For current implementation it is a String, but that can
47   change to a more structured identifier (such as YangInstanceIdentifier) in future.
48
49 Transaction::
50   A set of operations on the data tree which are treated as an
51   atomic unit. All operations in a transaction share fate, e.g. either abort or
52   complete successfully.
53
54 Member Name::
55   The name for a logical node participating in Akka Clustering and
56   is synonymous to a Actor System instance. For deployment purposes, it is a simple
57   String, which uniquely identifies an OpenDaylight instance. This name does not
58   change for as long as an instance exists. From deployment perspective, a member
59   name can be changed only by removing that member and re-adding it. Any uncommitted
60   transactions from a removed member will be lost.
61
62 Global History::
63   Is the aggregated history of all transactions successfully
64   committed in the datastore. It is resilient to concurrent failure of N members,
65   if total number of members is (2N + 1).
66
67 Local History::
68   This is a generalized concept behind a `TransactionChain`. It
69   includes all transactions committed in the *Global History* at the point when
70   it was created and any transactions submitted in the `TransactionChain`. Local
71   History is always tied to a single Frontend incarnation and shares its fate.
72
73 [NOTE]
74 .When does Local History get updated with Global History so that it reflects transactions committed from other Local Histories?
75
76 TransactionChain implementations we have today deal with this by rebasing their
77 Local History when they have no outstanding transactions (e.g. have been
78 fully merged into Global History). This is problematic under load, because if
79 a TransactionChain is too busy to run out of outstanding transaction and will
80 keep retaining the view of Global History as it existed when the TransactionChain
81 was created -- preventing old state from ever being garbage-collected.
82
83 == Client/Leader locality
84
85 Akka Clustering provides location independence for actor message exchanges and
86 the data store could be implemented using a single messaging pattern. This would
87 (and in fact did) result in an implementation, which performs reasonably well.
88 Unfortunately it also prevents us from using features available in InMemoryDataTree,
89 which allows for better splitting of responsibilities between the Frontend and
90 the Backend by performing many preparatory tasks in the Frontend thread, without
91 the need for message passing.
92
93 Elimination of messaging provides significant improvement in operation throughput
94 and also application developer experience, as invalid operations are detected
95 immediately in the calling thread rather than being delivered when an attempt
96 to commit the transaction is made.
97
98 With this in mind, the Frontend/Backend internactions have two distinct patterns,
99 one used for when the Leader is known to be local and the other used when its
100 locality is either uncertain or known to be remote.
101
102 == Failure modes
103
104 As noted in the overview section, all frontend and backend instances operate in
105 a single Akka Cluster, which is logically composed of per-member actor systems.
106
107 This setup results in multiple distinct failure modes which need to be analyzed
108 separately for ways how they are detected, their effect on both Frontend and Backend
109 state and recovery strategies taken.
110
111 === Shard Leader Failure
112   A shard failure occurs when a Shard actor instance is stopped while it is in
113   the RAFT leader state, isolated or not. This failure is detected by the Actor
114   System, which will restart the actor. Upon restart, Shard starts in RAFT follower
115   state, unable to process Frontend transactions as it could before. Both Frontend
116   and Backend actors can observe message loss.
117
118 === Loss of Actor System connectivity
119   This failure mode occurs when connectivity between nodes is lost. This is primarily
120   detected by message timeouts, where messages to and from remote actors fail
121   to arrive. At the logic level this failure cannot be discerned from the remote
122   actors being too busy to process the request within the deadline. When a timeout
123   occurs the party which sent the request does not know if the request has been
124   processed, nor wether that request will be executed at some point in the future.
125
126 === Shard Leader component actor failure
127   Shard implementation makes use of child actors to allow horizontal scaling,
128   effectively creating a threadpool. Some of these actors track Frontend state
129   without persisting it, hence a loss of such an actor means that we have lost
130   some state Frontend expects us to have. The failure is detected by the Shard
131   actor.
132
133 == Dealing with failure modes
134
135 === Backend
136   The Backend deals with these by running RAFT, which uses heartbeats and reacts
137   accordingly by electing a new leader if necessary. A leader which cannot communicate
138   with its followers is prevented from making forward progress. State is reconciled
139   when the partition heals. This is possible because each state transition in
140   RAFT has a unique identifier (the journal index).
141
142   Shard Leader component actor failures are dealt with in a fail-fast manner, where
143   a tombstone for the Frontend component corresponding to the failed actor. Failure
144   cause is recorded in the tombstone and all subsequent Frontend messages towards
145   that actor are immediately rejected, stating the failure cause.
146
147 === Frontend
148   The Frontend needs to deal with this mode by eliminating unknowns to the extent
149   reasonably possible, or by propagating any unresolved unknowns to the user
150   application.
151
152   In order to achieve this, all state transitions need to have a globally-unique
153   identifier and the Backend needs to track which transitions have been acted on.
154   This way all state transitions can be made idempotent, e.g. the Backend will ignore
155   any state transition if its internal state indicate the transition has already been
156   performed.
157
158   Idempotent transitions allow the Frontend to re-send them until it receives
159   an authoritative reply from the Backend -- either confirming the state transition
160   or rejecting it, hence eliminating the unknown and allowing the Frontend to
161   continue operating on well-defined state.
162
163 == Identifying state transitions
164   Transition identifiers form a namespace from which each transition needs to get
165   a unique identifier, which makes allocation a performance-critical part
166   of operation and needs to be scaled.
167
168   Transitions are always initiated from the Frontend, hence the responsibility
169   for allocating and retiring each transition identifier lies ultimately with
170   the Frontend. This matter is slightly complicated by the fact that the Backend
171   needs to track transitions efficiently, hence we will define additional rules
172   when particular transition identifiers are implicitly retired based on events
173   observed by the Backend.
174
175   Due to the scaling requirement, the Transition Identifier namespace is a hierarchical,
176   where each level in the hierarchy is treated as a separate namespace, lending
177   itself to delegation.
178
179   The first level in the hierarchy is the Member Name where the originating Frontend
180   resides. Since Member Names are unique within a cluster, this allows for allocation
181   and retirement to be tied to member lifecycle. Allocation and retirement does not
182   need to be communicated across members. If a member is removed from a cluster,
183   all its Transition Identifiers are immediately retired. A Member may not reuse
184   a transition identifier it has used for communication until it has confirmed
185   that the Backend has acknowledged its implicit or explicit retirement.
186
187   The second level in the hierarchy is the Frontend Generation number. It is used
188   to distiguish Frontend instances on the same member across Frontend restarts.
189   We could use Akka Persistence or similar to persist Frontend state, all of the state
190   is inherently transitive and high-churn, so persisting it would lead to inferior
191   degradation. Every time a Frontend instance is started it is assigned a unique,
192   generation number. There are multiple mechanisms how such a number could be obtained,
193   such as UUID generation, but it is advantageous to have this number increasing
194   monotonically. For this reason we will use a locally-persisted number, which is
195   incremented (and persisted once) everytime a Frontend instance is started. We will
196   use an unsigned 64bit number, stored as a simple long, but treated as an unsigned
197   by using Long.*Unsigned*() methods to interact with it.
198
199   These two levels are combined into a FrontendIdentifier and embedded in every
200   message the Frontend sends to the Backend.
201
202   The third level in the hierarchy is Local History identifier, which identifies
203   a particular Local History managed by the Frontend. This is a 64bit unsigned
204   integer, which is unique for a particular Frontend Generation. It is monotonically
205   increasing from 1. Zero is treated as a 'no Local History' marker and is made
206   special only to support free-standing transactions without the need to define
207   a set of dedicated messages.
208
209   The fourth level is the transaction number. This 64bit number uniquely identifies
210   a particular transaction within a Local History. It is also monotonically incremented
211   for each transaction created in the Local History.
212
213   These two levels are combined into a LocalTransactionIdentifier, which uniquely
214   identifies any transaction originating from a Frontend instance. When combined
215   with FrontendIdentifier, it also uniquely identifies any transaction that has
216   ever executed in the cluster, e.g. forming a GlobalTransactionIdentifier.
217
218 == Persistence model
219
220 === Frontend persistence
221   As noted above, implementation of the Frontend Generation requires each member
222   to persistently store a single 64 bit counter. To keep the persistence interfaces
223   within a member consistent, Frontend uses Akka Persistence to maintain this
224   counter across restarts.
225
226   State for running transactions is kept in the Frontend until the backend confirms
227   it being completed. If an inconsistency occurs (due to messages getting lost
228   or Shard Leader failing), the Frontend will retransmit this state to the Backend.
229
230 === Backend persistence
231   The Backend needs to persist and replicate enough state to ensure that Shard
232   Leader movement does not render Local Histories inoperable, as that has
233   a direct impact on applications, which need to have a state reconciliation
234   strategy to deal with this situation. Development of complex strategies cannot
235   be expected from most applications, as their developers will implement
236   a simple strategy of performing a full state dump into the data store, for
237   example by flapping a BGP session. If the Shard Leader movement was caused
238   by the leader experiencing performance problems, such a dump could result
239   in the new Shard Leader failing for the same reason, which would lead to
240   the application not being able to make forward progress.
241
242   In order to achieve this, the Backend needs to persist both the data stored
243   by the application and enough metadata about Local Histories and Transactions
244   to make state recovery across shard leadership movement possible.
245
246 ==== User Data persistence
247   The Snapshot needs to identify all transactions which have been integrated
248   into it. Enumerating all of them is clearly not practical, hence we exploit
249   the monotonic nature of identifiers' relationship to Global History. For each
250   valid Local History, we only need to store the last transaction number, as
251   it implies all previous transactions from that Local History have been accepted
252   into Global History. This does not include any Local Histories which have
253   been retired by the Frontend, as any attempt to access their state would
254   mean use-after-free on Frontend's part. The same holds true of any metadata
255   about a Frontend generation -- as soon as the Shard Leader receives a message
256   from a newer generation, it can safely discard any metadata mentioning the
257   prior generation and consider all its identifiers as retired.
258
259   Each DataTreeCandidatePayload entry needs to contain
260   the GlobalTransactionIdentifier of the transaction which has caused the
261   delta carried in that entry. This information is used to summarize transaction
262   metadata when this entry is snapshotted.
263
264 ==== Internal state persistence
265   The Backend creates a hierarchy of actors to track interactions with the Frontend.
266   Information necessary to recreate these actors needs to be persisted within
267   the journal. This boils down to existence of Local Histories and the last
268   transaction committed for a particular history.
269
270   Whenever a Local History is created on the Frontend, a corresponding actor is
271   created on the Shard Leader. Before the leader acknowledges its creation, it
272   needs to persist a state transition, identifying the Local History. Whenever
273   a Local History is closed down, a corresponding tombstone is stored in the journal.
274   The last transaction identifier for a local history is tracked via snapshots
275   and DataTreeCandidatePayloads.
276
277   In order to minimize overhead on Followers, the actor hierarchy is not created
278   until the actor transitions to Shard Leader state. It only tracks the information
279   required.
280
281 == Actor model
282   Both Backend and Frontend are implemented as a set of cooperating actors. This section
283   outlines the structure and how it relates to user application interactions.
284
285 === Frontend actor model
286   The current implementation does not have explicit actors, which means that every
287   interaction with the Backend is done via Patterns.ask(), which creates implicit
288   actors. This model proposes to change that, making the model explicit -- allowing
289   us to better track state of interactions.
290
291 ==== FrontendActor
292   FrontendActor acts as the single, root-level actor. It is tied to
293   a ConcurrentDataBroker instance. Its resposibility is to maintain the generation
294   number (hence it is a PersistentActor), maintain information about individual
295   ShardLeader locations and direct this information to its children. It maintains
296   a set of LocalHistoryActors and a set of SingleTransactionActors.
297
298   Its responsibility is to talk to ShardManager and propagate information about
299   Shard Leader location towards its child actors.
300
301 ==== LocalHistoryActor
302   A LocalHistoryActor is tied to a DOMTransactionChain, maintains all local state
303   to it and also tracks its propagation to the Shard Leader. This is not a persistent
304   actor.
305
306   It maintains a relationship to its Backend counterpart and routes operations
307   (remote) and transactions (local) towards it. Should a request time out, it
308   transitions to idle state and informs FrontendActor, which will resume it once
309   the leader location information is refreshed.
310
311 ==== SingleTransactionActor
312   A SingleTransactionActor takes care of transactions which are not tied to a
313   LocalHistory, e.g. single transactions instatiated via
314   DOMDataBroker.newXXXTransaction()). These actors instantiated for each transaction
315   separately and get terminated once the transaction is completed, successfully or not.
316
317