1 = Distributed Data Store implementation
4 .This is evolving document
6 This document is in no sense complete, it describes the intenteded implementation.
7 It is subject to revisions as feedback is received and as the implementation evolves.
10 The implementation builds on `sal-akka-raft` and adds business logic to
11 create a strongly-consistent, replicated, `DataTree`. It does this by introducing
12 a finite state machine to interact with local and remote clients and replicates
13 this using RAFT. The FSM is built around an internal `InMemoryDataTree` instance,
14 which holds the actual data being replicated and takes care of performing
15 individual operations (bundled in transactions) on the data.
17 It exposes an MD-SAL `DOMDataBroker` interface, which individual applications use,
18 directly, or via the Binding Adaptor, to perform operations.
20 The implementation is split into two major blocks:
23 The actual implementation of the replicated data tree. It is pivoted
24 around Shard, a subclass of `RaftActor`, and provides a set of messages for the
25 *Frontend* to submit work to the Leader, which is then replicated to Followers.
28 The MD-SAL layer, which is co-located with the application accessing
29 data stored in the data store. It is responsible for implementing MD-SAL
30 `DOMDataBroker` and related interfaces and translate them to messages, which drive
31 the Backend. It is also responsible with dealing with common failures occuring
32 in communication with the Backend, such as messages getting lost and Shard
33 Leader moving in the system, to the extent reasonably possible. In current
34 implementation this maps to a ConcurrentDataBroker instance.
36 Both Backend and Frontend share a common Actor System on the local node. Nodes are
37 connected together using Akka Clustering. There can be at most one Frontend instance
38 running in an Actor System. For each individual shard, there can be at most one
39 instance running in an Actor System.
44 The logical name of a part of the Conceptual Data Tree. It is
45 associated with a Shard instance. Each member can run at most one such instance
46 for a particular name. For current implementation it is a String, but that can
47 change to a more structured identifier (such as YangInstanceIdentifier) in future.
50 A set of operations on the data tree which are treated as an
51 atomic unit. All operations in a transaction share fate, e.g. either abort or
52 complete successfully.
55 The name for a logical node participating in Akka Clustering and
56 is synonymous to a Actor System instance. For deployment purposes, it is a simple
57 String, which uniquely identifies an OpenDaylight instance. This name does not
58 change for as long as an instance exists. From deployment perspective, a member
59 name can be changed only by removing that member and re-adding it. Any uncommitted
60 transactions from a removed member will be lost.
63 Is the aggregated history of all transactions successfully
64 committed in the datastore. It is resilient to concurrent failure of N members,
65 if total number of members is (2N + 1).
68 This is a generalized concept behind a `TransactionChain`. It
69 includes all transactions committed in the *Global History* at the point when
70 it was created and any transactions submitted in the `TransactionChain`. Local
71 History is always tied to a single Frontend incarnation and shares its fate.
74 .When does Local History get updated with Global History so that it reflects transactions committed from other Local Histories?
76 TransactionChain implementations we have today deal with this by rebasing their
77 Local History when they have no outstanding transactions (e.g. have been
78 fully merged into Global History). This is problematic under load, because if
79 a TransactionChain is too busy to run out of outstanding transaction and will
80 keep retaining the view of Global History as it existed when the TransactionChain
81 was created -- preventing old state from ever being garbage-collected.
83 == Client/Leader locality
85 Akka Clustering provides location independence for actor message exchanges and
86 the data store could be implemented using a single messaging pattern. This would
87 (and in fact did) result in an implementation, which performs reasonably well.
88 Unfortunately it also prevents us from using features available in InMemoryDataTree,
89 which allows for better splitting of responsibilities between the Frontend and
90 the Backend by performing many preparatory tasks in the Frontend thread, without
91 the need for message passing.
93 Elimination of messaging provides significant improvement in operation throughput
94 and also application developer experience, as invalid operations are detected
95 immediately in the calling thread rather than being delivered when an attempt
96 to commit the transaction is made.
98 With this in mind, the Frontend/Backend internactions have two distinct patterns,
99 one used for when the Leader is known to be local and the other used when its
100 locality is either uncertain or known to be remote.
104 As noted in the overview section, all frontend and backend instances operate in
105 a single Akka Cluster, which is logically composed of per-member actor systems.
107 This setup results in multiple distinct failure modes which need to be analyzed
108 separately for ways how they are detected, their effect on both Frontend and Backend
109 state and recovery strategies taken.
111 === Shard Leader Failure
112 A shard failure occurs when a Shard actor instance is stopped while it is in
113 the RAFT leader state, isolated or not. This failure is detected by the Actor
114 System, which will restart the actor. Upon restart, Shard starts in RAFT follower
115 state, unable to process Frontend transactions as it could before. Both Frontend
116 and Backend actors can observe message loss.
118 === Loss of Actor System connectivity
119 This failure mode occurs when connectivity between nodes is lost. This is primarily
120 detected by message timeouts, where messages to and from remote actors fail
121 to arrive. At the logic level this failure cannot be discerned from the remote
122 actors being too busy to process the request within the deadline. When a timeout
123 occurs the party which sent the request does not know if the request has been
124 processed, nor wether that request will be executed at some point in the future.
126 === Shard Leader component actor failure
127 Shard implementation makes use of child actors to allow horizontal scaling,
128 effectively creating a threadpool. Some of these actors track Frontend state
129 without persisting it, hence a loss of such an actor means that we have lost
130 some state Frontend expects us to have. The failure is detected by the Shard
133 == Dealing with failure modes
136 The Backend deals with these by running RAFT, which uses heartbeats and reacts
137 accordingly by electing a new leader if necessary. A leader which cannot communicate
138 with its followers is prevented from making forward progress. State is reconciled
139 when the partition heals. This is possible because each state transition in
140 RAFT has a unique identifier (the journal index).
142 Shard Leader component actor failures are dealt with in a fail-fast manner, where
143 a tombstone for the Frontend component corresponding to the failed actor. Failure
144 cause is recorded in the tombstone and all subsequent Frontend messages towards
145 that actor are immediately rejected, stating the failure cause.
148 The Frontend needs to deal with this mode by eliminating unknowns to the extent
149 reasonably possible, or by propagating any unresolved unknowns to the user
152 In order to achieve this, all state transitions need to have a globally-unique
153 identifier and the Backend needs to track which transitions have been acted on.
154 This way all state transitions can be made idempotent, e.g. the Backend will ignore
155 any state transition if its internal state indicate the transition has already been
158 Idempotent transitions allow the Frontend to re-send them until it receives
159 an authoritative reply from the Backend -- either confirming the state transition
160 or rejecting it, hence eliminating the unknown and allowing the Frontend to
161 continue operating on well-defined state.
163 == Identifying state transitions
164 Transition identifiers form a namespace from which each transition needs to get
165 a unique identifier, which makes allocation a performance-critical part
166 of operation and needs to be scaled.
168 Transitions are always initiated from the Frontend, hence the responsibility
169 for allocating and retiring each transition identifier lies ultimately with
170 the Frontend. This matter is slightly complicated by the fact that the Backend
171 needs to track transitions efficiently, hence we will define additional rules
172 when particular transition identifiers are implicitly retired based on events
173 observed by the Backend.
175 Due to the scaling requirement, the Transition Identifier namespace is a hierarchical,
176 where each level in the hierarchy is treated as a separate namespace, lending
177 itself to delegation.
179 The first level in the hierarchy is the Member Name where the originating Frontend
180 resides. Since Member Names are unique within a cluster, this allows for allocation
181 and retirement to be tied to member lifecycle. Allocation and retirement does not
182 need to be communicated across members. If a member is removed from a cluster,
183 all its Transition Identifiers are immediately retired. A Member may not reuse
184 a transition identifier it has used for communication until it has confirmed
185 that the Backend has acknowledged its implicit or explicit retirement.
187 The second level in the hierarchy is the Frontend Generation number. It is used
188 to distiguish Frontend instances on the same member across Frontend restarts.
189 We could use Akka Persistence or similar to persist Frontend state, all of the state
190 is inherently transitive and high-churn, so persisting it would lead to inferior
191 degradation. Every time a Frontend instance is started it is assigned a unique,
192 generation number. There are multiple mechanisms how such a number could be obtained,
193 such as UUID generation, but it is advantageous to have this number increasing
194 monotonically. For this reason we will use a locally-persisted number, which is
195 incremented (and persisted once) everytime a Frontend instance is started. We will
196 use an unsigned 64bit number, stored as a simple long, but treated as an unsigned
197 by using Long.*Unsigned*() methods to interact with it.
199 These two levels are combined into a FrontendIdentifier and embedded in every
200 message the Frontend sends to the Backend.
202 The third level in the hierarchy is Local History identifier, which identifies
203 a particular Local History managed by the Frontend. This is a 64bit unsigned
204 integer, which is unique for a particular Frontend Generation. It is monotonically
205 increasing from 1. Zero is treated as a 'no Local History' marker and is made
206 special only to support free-standing transactions without the need to define
207 a set of dedicated messages.
209 The fourth level is the transaction number. This 64bit number uniquely identifies
210 a particular transaction within a Local History. It is also monotonically incremented
211 for each transaction created in the Local History.
213 These two levels are combined into a LocalTransactionIdentifier, which uniquely
214 identifies any transaction originating from a Frontend instance. When combined
215 with FrontendIdentifier, it also uniquely identifies any transaction that has
216 ever executed in the cluster, e.g. forming a GlobalTransactionIdentifier.
220 === Frontend persistence
221 As noted above, implementation of the Frontend Generation requires each member
222 to persistently store a single 64 bit counter. To keep the persistence interfaces
223 within a member consistent, Frontend uses Akka Persistence to maintain this
224 counter across restarts.
226 State for running transactions is kept in the Frontend until the backend confirms
227 it being completed. If an inconsistency occurs (due to messages getting lost
228 or Shard Leader failing), the Frontend will retransmit this state to the Backend.
230 === Backend persistence
231 The Backend needs to persist and replicate enough state to ensure that Shard
232 Leader movement does not render Local Histories inoperable, as that has
233 a direct impact on applications, which need to have a state reconciliation
234 strategy to deal with this situation. Development of complex strategies cannot
235 be expected from most applications, as their developers will implement
236 a simple strategy of performing a full state dump into the data store, for
237 example by flapping a BGP session. If the Shard Leader movement was caused
238 by the leader experiencing performance problems, such a dump could result
239 in the new Shard Leader failing for the same reason, which would lead to
240 the application not being able to make forward progress.
242 In order to achieve this, the Backend needs to persist both the data stored
243 by the application and enough metadata about Local Histories and Transactions
244 to make state recovery across shard leadership movement possible.
246 ==== User Data persistence
247 The Snapshot needs to identify all transactions which have been integrated
248 into it. Enumerating all of them is clearly not practical, hence we exploit
249 the monotonic nature of identifiers' relationship to Global History. For each
250 valid Local History, we only need to store the last transaction number, as
251 it implies all previous transactions from that Local History have been accepted
252 into Global History. This does not include any Local Histories which have
253 been retired by the Frontend, as any attempt to access their state would
254 mean use-after-free on Frontend's part. The same holds true of any metadata
255 about a Frontend generation -- as soon as the Shard Leader receives a message
256 from a newer generation, it can safely discard any metadata mentioning the
257 prior generation and consider all its identifiers as retired.
259 Each DataTreeCandidatePayload entry needs to contain
260 the GlobalTransactionIdentifier of the transaction which has caused the
261 delta carried in that entry. This information is used to summarize transaction
262 metadata when this entry is snapshotted.
264 ==== Internal state persistence
265 The Backend creates a hierarchy of actors to track interactions with the Frontend.
266 Information necessary to recreate these actors needs to be persisted within
267 the journal. This boils down to existence of Local Histories and the last
268 transaction committed for a particular history.
270 Whenever a Local History is created on the Frontend, a corresponding actor is
271 created on the Shard Leader. Before the leader acknowledges its creation, it
272 needs to persist a state transition, identifying the Local History. Whenever
273 a Local History is closed down, a corresponding tombstone is stored in the journal.
274 The last transaction identifier for a local history is tracked via snapshots
275 and DataTreeCandidatePayloads.
277 In order to minimize overhead on Followers, the actor hierarchy is not created
278 until the actor transitions to Shard Leader state. It only tracks the information
282 Both Backend and Frontend are implemented as a set of cooperating actors. This section
283 outlines the structure and how it relates to user application interactions.
285 === Frontend actor model
286 The current implementation does not have explicit actors, which means that every
287 interaction with the Backend is done via Patterns.ask(), which creates implicit
288 actors. This model proposes to change that, making the model explicit -- allowing
289 us to better track state of interactions.
292 FrontendActor acts as the single, root-level actor. It is tied to
293 a ConcurrentDataBroker instance. Its resposibility is to maintain the generation
294 number (hence it is a PersistentActor), maintain information about individual
295 ShardLeader locations and direct this information to its children. It maintains
296 a set of LocalHistoryActors and a set of SingleTransactionActors.
298 Its responsibility is to talk to ShardManager and propagate information about
299 Shard Leader location towards its child actors.
301 ==== LocalHistoryActor
302 A LocalHistoryActor is tied to a DOMTransactionChain, maintains all local state
303 to it and also tracks its propagation to the Shard Leader. This is not a persistent
306 It maintains a relationship to its Backend counterpart and routes operations
307 (remote) and transactions (local) towards it.
309 Requests sent from the frontend to the local history actor are subjects to timeouts.
310 If a timeout occurs, the frontend is expected to perform full backend leader
311 discovery, contact the leader and reconcile transaction state with it.
313 In order to minimize messaging latency while maintaining resiliency to message
314 loss (due to TCP connectivity issues), there are two messaging patterns:
316 -- Frontend talks to the Shard. This is used during instantiation of a local
317 history, as well as state reconciliation after a retriable failure has
318 occurred (such as a request timeout).
320 -- Frontend talks to the Local History Actor. This is used during transaction
321 operation, e.g. when a transaction is being built up. Once the transaction
322 is readied, the LHA will send a message to the Shard, which takes ownership
323 of that transaction's state. Since this handoff is not observable by
324 the frontend (which would require additional round-trip), the Shard
325 replicates its responses to the LHA. In case a timeout occurs, the frontend
326 will contact the LHA (as that's the last endpoint it knows about), which
327 will replay the Shard message.
329 Messaging between a Local History Actor and its associated Shard is expected
330 to be reliable. This is guaranteed by the parent/child relationship, where
331 Shard is the parent. If a Shard fails, all of its children will be restarted,
332 losing all state. If an LHA fails, Shard will be notified with Akka DeathWatch
336 ==== SingleTransactionActor
337 A SingleTransactionActor takes care of transactions which are not tied to a
338 LocalHistory, e.g. single transactions instatiated via
339 DOMDataBroker.newXXXTransaction()). These actors instantiated for each transaction
340 separately and get terminated once the transaction is completed, successfully or not.