--- /dev/null
+= Distributed Data Store implementation
+
+[NOTE]
+.This is evolving document
+
+This document is in no sense complete, it describes the intenteded implementation.
+It is subject to revisions as feedback is received and as the implementation evolves.
+
+== Overview
+The implementation builds on `sal-akka-raft` and adds business logic to
+create a strongly-consistent, replicated, `DataTree`. It does this by introducing
+a finite state machine to interact with local and remote clients and replicates
+this using RAFT. The FSM is built around an internal `InMemoryDataTree` instance,
+which holds the actual data being replicated and takes care of performing
+individual operations (bundled in transactions) on the data.
+
+It exposes an MD-SAL `DOMDataBroker` interface, which individual applications use,
+directly, or via the Binding Adaptor, to perform operations.
+
+The implementation is split into two major blocks:
+
+Backend::
+ The actual implementation of the replicated data tree. It is pivoted
+ around Shard, a subclass of `RaftActor`, and provides a set of messages for the
+ *Frontend* to submit work to the Leader, which is then replicated to Followers.
+
+Frontend::
+ The MD-SAL layer, which is co-located with the application accessing
+ data stored in the data store. It is responsible for implementing MD-SAL
+ `DOMDataBroker` and related interfaces and translate them to messages, which drive
+ the Backend. It is also responsible with dealing with common failures occuring
+ in communication with the Backend, such as messages getting lost and Shard
+ Leader moving in the system, to the extent reasonably possible. In current
+ implementation this maps to a ConcurrentDataBroker instance.
+
+Both Backend and Frontend share a common Actor System on the local node. Nodes are
+connected together using Akka Clustering. There can be at most one Frontend instance
+running in an Actor System. For each individual shard, there can be at most one
+instance running in an Actor System.
+
+== Concepts
+
+Shard Name::
+ The logical name of a part of the Conceptual Data Tree. It is
+ associated with a Shard instance. Each member can run at most one such instance
+ for a particular name. For current implementation it is a String, but that can
+ change to a more structured identifier (such as YangInstanceIdentifier) in future.
+
+Transaction::
+ A set of operations on the data tree which are treated as an
+ atomic unit. All operations in a transaction share fate, e.g. either abort or
+ complete successfully.
+
+Member Name::
+ The name for a logical node participating in Akka Clustering and
+ is synonymous to a Actor System instance. For deployment purposes, it is a simple
+ String, which uniquely identifies an OpenDaylight instance. This name does not
+ change for as long as an instance exists. From deployment perspective, a member
+ name can be changed only by removing that member and re-adding it. Any uncommitted
+ transactions from a removed member will be lost.
+
+Global History::
+ Is the aggregated history of all transactions successfully
+ committed in the datastore. It is resilient to concurrent failure of N members,
+ if total number of members is (2N + 1).
+
+Local History::
+ This is a generalized concept behind a `TransactionChain`. It
+ includes all transactions committed in the *Global History* at the point when
+ it was created and any transactions submitted in the `TransactionChain`. Local
+ History is always tied to a single Frontend incarnation and shares its fate.
+
+[NOTE]
+.When does Local History get updated with Global History so that it reflects transactions committed from other Local Histories?
+
+TransactionChain implementations we have today deal with this by rebasing their
+Local History when they have no outstanding transactions (e.g. have been
+fully merged into Global History). This is problematic under load, because if
+a TransactionChain is too busy to run out of outstanding transaction and will
+keep retaining the view of Global History as it existed when the TransactionChain
+was created -- preventing old state from ever being garbage-collected.
+
+== Client/Leader locality
+
+Akka Clustering provides location independence for actor message exchanges and
+the data store could be implemented using a single messaging pattern. This would
+(and in fact did) result in an implementation, which performs reasonably well.
+Unfortunately it also prevents us from using features available in InMemoryDataTree,
+which allows for better splitting of responsibilities between the Frontend and
+the Backend by performing many preparatory tasks in the Frontend thread, without
+the need for message passing.
+
+Elimination of messaging provides significant improvement in operation throughput
+and also application developer experience, as invalid operations are detected
+immediately in the calling thread rather than being delivered when an attempt
+to commit the transaction is made.
+
+With this in mind, the Frontend/Backend internactions have two distinct patterns,
+one used for when the Leader is known to be local and the other used when its
+locality is either uncertain or known to be remote.
+
+== Failure modes
+
+As noted in the overview section, all frontend and backend instances operate in
+a single Akka Cluster, which is logically composed of per-member actor systems.
+
+This setup results in multiple distinct failure modes which need to be analyzed
+separately for ways how they are detected, their effect on both Frontend and Backend
+state and recovery strategies taken.
+
+=== Shard Leader Failure
+ A shard failure occurs when a Shard actor instance is stopped while it is in
+ the RAFT leader state, isolated or not. This failure is detected by the Actor
+ System, which will restart the actor. Upon restart, Shard starts in RAFT follower
+ state, unable to process Frontend transactions as it could before. Both Frontend
+ and Backend actors can observe message loss.
+
+=== Loss of Actor System connectivity
+ This failure mode occurs when connectivity between nodes is lost. This is primarily
+ detected by message timeouts, where messages to and from remote actors fail
+ to arrive. At the logic level this failure cannot be discerned from the remote
+ actors being too busy to process the request within the deadline. When a timeout
+ occurs the party which sent the request does not know if the request has been
+ processed, nor wether that request will be executed at some point in the future.
+
+=== Shard Leader component actor failure
+ Shard implementation makes use of child actors to allow horizontal scaling,
+ effectively creating a threadpool. Some of these actors track Frontend state
+ without persisting it, hence a loss of such an actor means that we have lost
+ some state Frontend expects us to have. The failure is detected by the Shard
+ actor.
+
+== Dealing with failure modes
+
+=== Backend
+ The Backend deals with these by running RAFT, which uses heartbeats and reacts
+ accordingly by electing a new leader if necessary. A leader which cannot communicate
+ with its followers is prevented from making forward progress. State is reconciled
+ when the partition heals. This is possible because each state transition in
+ RAFT has a unique identifier (the journal index).
+
+ Shard Leader component actor failures are dealt with in a fail-fast manner, where
+ a tombstone for the Frontend component corresponding to the failed actor. Failure
+ cause is recorded in the tombstone and all subsequent Frontend messages towards
+ that actor are immediately rejected, stating the failure cause.
+
+=== Frontend
+ The Frontend needs to deal with this mode by eliminating unknowns to the extent
+ reasonably possible, or by propagating any unresolved unknowns to the user
+ application.
+
+ In order to achieve this, all state transitions need to have a globally-unique
+ identifier and the Backend needs to track which transitions have been acted on.
+ This way all state transitions can be made idempotent, e.g. the Backend will ignore
+ any state transition if its internal state indicate the transition has already been
+ performed.
+
+ Idempotent transitions allow the Frontend to re-send them until it receives
+ an authoritative reply from the Backend -- either confirming the state transition
+ or rejecting it, hence eliminating the unknown and allowing the Frontend to
+ continue operating on well-defined state.
+
+== Identifying state transitions
+ Transition identifiers form a namespace from which each transition needs to get
+ a unique identifier, which makes allocation a performance-critical part
+ of operation and needs to be scaled.
+
+ Transitions are always initiated from the Frontend, hence the responsibility
+ for allocating and retiring each transition identifier lies ultimately with
+ the Frontend. This matter is slightly complicated by the fact that the Backend
+ needs to track transitions efficiently, hence we will define additional rules
+ when particular transition identifiers are implicitly retired based on events
+ observed by the Backend.
+
+ Due to the scaling requirement, the Transition Identifier namespace is a hierarchical,
+ where each level in the hierarchy is treated as a separate namespace, lending
+ itself to delegation.
+
+ The first level in the hierarchy is the Member Name where the originating Frontend
+ resides. Since Member Names are unique within a cluster, this allows for allocation
+ and retirement to be tied to member lifecycle. Allocation and retirement does not
+ need to be communicated across members. If a member is removed from a cluster,
+ all its Transition Identifiers are immediately retired. A Member may not reuse
+ a transition identifier it has used for communication until it has confirmed
+ that the Backend has acknowledged its implicit or explicit retirement.
+
+ The second level in the hierarchy is the Frontend Generation number. It is used
+ to distiguish Frontend instances on the same member across Frontend restarts.
+ We could use Akka Persistence or similar to persist Frontend state, all of the state
+ is inherently transitive and high-churn, so persisting it would lead to inferior
+ degradation. Every time a Frontend instance is started it is assigned a unique,
+ generation number. There are multiple mechanisms how such a number could be obtained,
+ such as UUID generation, but it is advantageous to have this number increasing
+ monotonically. For this reason we will use a locally-persisted number, which is
+ incremented (and persisted once) everytime a Frontend instance is started. We will
+ use an unsigned 64bit number, stored as a simple long, but treated as an unsigned
+ by using Long.*Unsigned*() methods to interact with it.
+
+ These two levels are combined into a FrontendIdentifier and embedded in every
+ message the Frontend sends to the Backend.
+
+ The third level in the hierarchy is Local History identifier, which identifies
+ a particular Local History managed by the Frontend. This is a 64bit unsigned
+ integer, which is unique for a particular Frontend Generation. It is monotonically
+ increasing from 1. Zero is treated as a 'no Local History' marker and is made
+ special only to support free-standing transactions without the need to define
+ a set of dedicated messages.
+
+ The fourth level is the transaction number. This 64bit number uniquely identifies
+ a particular transaction within a Local History. It is also monotonically incremented
+ for each transaction created in the Local History.
+
+ These two levels are combined into a LocalTransactionIdentifier, which uniquely
+ identifies any transaction originating from a Frontend instance. When combined
+ with FrontendIdentifier, it also uniquely identifies any transaction that has
+ ever executed in the cluster, e.g. forming a GlobalTransactionIdentifier.
+
+== Frontend persistence
+ As noted above, implementation of the Frontend Generation requires each member
+ to persistently store a single 64 bit counter. To keep the persistence interfaces
+ within a member consistent, Frontend uses Akka Persistence to maintain this
+ counter across restarts.
+
+ State for running transactions is kept in the Frontend until the backend confirms
+ it being completed. If an inconsistency occurs (due to messages getting lost
+ or Shard Leader failing), the Frontend will retransmit this state to the Backend.
+
+== Backend persistence
+ The Backend needs to persist and replicate enough state to ensure that Shard
+ Leader movement does not render Local Histories inoperable, as that has
+ a direct impact on applications, which need to have a state reconciliation
+ strategy to deal with this situation. Development of complex strategies cannot
+ be expected from most applications, as their developers will implement
+ a simple strategy of performing a full state dump into the data store, for
+ example by flapping a BGP session. If the Shard Leader movement was caused
+ by the leader experiencing performance problems, such a dump could result
+ in the new Shard Leader failing for the same reason, which would lead to
+ the application not being able to make forward progress.
+
+ In order to achieve this, the Backend needs to persist both the data stored
+ by the application and enough metadata about Local Histories and Transactions
+ to make state recovery across shard leadership movement possible.
+
+=== User Data persistence
+ The Snapshot needs to identify all transactions which have been integrated
+ into it. Enumerating all of them is clearly not practical, hence we exploit
+ the monotonic nature of identifiers' relationship to Global History. For each
+ valid Local History, we only need to store the last transaction number, as
+ it implies all previous transactions from that Local History have been accepted
+ into Global History. This does not include any Local Histories which have
+ been retired by the Frontend, as any attempt to access their state would
+ mean use-after-free on Frontend's part. The same holds true of any metadata
+ about a Frontend generation -- as soon as the Shard Leader receives a message
+ from a newer generation, it can safely discard any metadata mentioning the
+ prior generation and consider all its identifiers as retired.
+
+ Each DataTreeCandidatePayload entry needs to contain
+ the GlobalTransactionIdentifier of the transaction which has caused the
+ delta carried in that entry. This information is used to summarize transaction
+ metadata when this entry is snapshotted.
+
+=== Internal state persistence
+ The Backend creates a hierarchy of actors to track interactions with the Frontend.
+ Information necessary to recreate these actors needs to be persisted within
+ the journal. This boils down to existence of Local Histories and the last
+ transaction committed for a particular history.
+
+ Whenever a Local History is created on the Frontend, a corresponding actor is
+ created on the Shard Leader. Before the leader acknowledges its creation, it
+ needs to persist a state transition, identifying the Local History. Whenever
+ a Local History is closed down, a corresponding tombstone is stored in the journal.
+ The last transaction identifier for a local history is tracked via snapshots
+ and DataTreeCandidatePayloads.
+
+ In order to minimize overhead on Followers, the actor hierarchy is not created
+ until the actor transitions to Shard Leader state. It only tracks the information
+ required.
+