= Distributed Data Store implementation [NOTE] .This is evolving document This document is in no sense complete, it describes the intenteded implementation. It is subject to revisions as feedback is received and as the implementation evolves. == Overview The implementation builds on `sal-akka-raft` and adds business logic to create a strongly-consistent, replicated, `DataTree`. It does this by introducing a finite state machine to interact with local and remote clients and replicates this using RAFT. The FSM is built around an internal `InMemoryDataTree` instance, which holds the actual data being replicated and takes care of performing individual operations (bundled in transactions) on the data. It exposes an MD-SAL `DOMDataBroker` interface, which individual applications use, directly, or via the Binding Adaptor, to perform operations. The implementation is split into two major blocks: Backend:: The actual implementation of the replicated data tree. It is pivoted around Shard, a subclass of `RaftActor`, and provides a set of messages for the *Frontend* to submit work to the Leader, which is then replicated to Followers. Frontend:: The MD-SAL layer, which is co-located with the application accessing data stored in the data store. It is responsible for implementing MD-SAL `DOMDataBroker` and related interfaces and translate them to messages, which drive the Backend. It is also responsible with dealing with common failures occuring in communication with the Backend, such as messages getting lost and Shard Leader moving in the system, to the extent reasonably possible. In current implementation this maps to a ConcurrentDataBroker instance. Both Backend and Frontend share a common Actor System on the local node. Nodes are connected together using Akka Clustering. There can be at most one Frontend instance running in an Actor System. For each individual shard, there can be at most one instance running in an Actor System. == Concepts Shard Name:: The logical name of a part of the Conceptual Data Tree. It is associated with a Shard instance. Each member can run at most one such instance for a particular name. For current implementation it is a String, but that can change to a more structured identifier (such as YangInstanceIdentifier) in future. Transaction:: A set of operations on the data tree which are treated as an atomic unit. All operations in a transaction share fate, e.g. either abort or complete successfully. Member Name:: The name for a logical node participating in Akka Clustering and is synonymous to a Actor System instance. For deployment purposes, it is a simple String, which uniquely identifies an OpenDaylight instance. This name does not change for as long as an instance exists. From deployment perspective, a member name can be changed only by removing that member and re-adding it. Any uncommitted transactions from a removed member will be lost. Global History:: Is the aggregated history of all transactions successfully committed in the datastore. It is resilient to concurrent failure of N members, if total number of members is (2N + 1). Local History:: This is a generalized concept behind a `TransactionChain`. It includes all transactions committed in the *Global History* at the point when it was created and any transactions submitted in the `TransactionChain`. Local History is always tied to a single Frontend incarnation and shares its fate. [NOTE] .When does Local History get updated with Global History so that it reflects transactions committed from other Local Histories? TransactionChain implementations we have today deal with this by rebasing their Local History when they have no outstanding transactions (e.g. have been fully merged into Global History). This is problematic under load, because if a TransactionChain is too busy to run out of outstanding transaction and will keep retaining the view of Global History as it existed when the TransactionChain was created -- preventing old state from ever being garbage-collected. == Client/Leader locality Akka Clustering provides location independence for actor message exchanges and the data store could be implemented using a single messaging pattern. This would (and in fact did) result in an implementation, which performs reasonably well. Unfortunately it also prevents us from using features available in InMemoryDataTree, which allows for better splitting of responsibilities between the Frontend and the Backend by performing many preparatory tasks in the Frontend thread, without the need for message passing. Elimination of messaging provides significant improvement in operation throughput and also application developer experience, as invalid operations are detected immediately in the calling thread rather than being delivered when an attempt to commit the transaction is made. With this in mind, the Frontend/Backend internactions have two distinct patterns, one used for when the Leader is known to be local and the other used when its locality is either uncertain or known to be remote. == Failure modes As noted in the overview section, all frontend and backend instances operate in a single Akka Cluster, which is logically composed of per-member actor systems. This setup results in multiple distinct failure modes which need to be analyzed separately for ways how they are detected, their effect on both Frontend and Backend state and recovery strategies taken. === Shard Leader Failure A shard failure occurs when a Shard actor instance is stopped while it is in the RAFT leader state, isolated or not. This failure is detected by the Actor System, which will restart the actor. Upon restart, Shard starts in RAFT follower state, unable to process Frontend transactions as it could before. Both Frontend and Backend actors can observe message loss. === Loss of Actor System connectivity This failure mode occurs when connectivity between nodes is lost. This is primarily detected by message timeouts, where messages to and from remote actors fail to arrive. At the logic level this failure cannot be discerned from the remote actors being too busy to process the request within the deadline. When a timeout occurs the party which sent the request does not know if the request has been processed, nor wether that request will be executed at some point in the future. === Shard Leader component actor failure Shard implementation makes use of child actors to allow horizontal scaling, effectively creating a threadpool. Some of these actors track Frontend state without persisting it, hence a loss of such an actor means that we have lost some state Frontend expects us to have. The failure is detected by the Shard actor. == Dealing with failure modes === Backend The Backend deals with these by running RAFT, which uses heartbeats and reacts accordingly by electing a new leader if necessary. A leader which cannot communicate with its followers is prevented from making forward progress. State is reconciled when the partition heals. This is possible because each state transition in RAFT has a unique identifier (the journal index). Shard Leader component actor failures are dealt with in a fail-fast manner, where a tombstone for the Frontend component corresponding to the failed actor. Failure cause is recorded in the tombstone and all subsequent Frontend messages towards that actor are immediately rejected, stating the failure cause. === Frontend The Frontend needs to deal with this mode by eliminating unknowns to the extent reasonably possible, or by propagating any unresolved unknowns to the user application. In order to achieve this, all state transitions need to have a globally-unique identifier and the Backend needs to track which transitions have been acted on. This way all state transitions can be made idempotent, e.g. the Backend will ignore any state transition if its internal state indicate the transition has already been performed. Idempotent transitions allow the Frontend to re-send them until it receives an authoritative reply from the Backend -- either confirming the state transition or rejecting it, hence eliminating the unknown and allowing the Frontend to continue operating on well-defined state. == Identifying state transitions Transition identifiers form a namespace from which each transition needs to get a unique identifier, which makes allocation a performance-critical part of operation and needs to be scaled. Transitions are always initiated from the Frontend, hence the responsibility for allocating and retiring each transition identifier lies ultimately with the Frontend. This matter is slightly complicated by the fact that the Backend needs to track transitions efficiently, hence we will define additional rules when particular transition identifiers are implicitly retired based on events observed by the Backend. Due to the scaling requirement, the Transition Identifier namespace is a hierarchical, where each level in the hierarchy is treated as a separate namespace, lending itself to delegation. The first level in the hierarchy is the Member Name where the originating Frontend resides. Since Member Names are unique within a cluster, this allows for allocation and retirement to be tied to member lifecycle. Allocation and retirement does not need to be communicated across members. If a member is removed from a cluster, all its Transition Identifiers are immediately retired. A Member may not reuse a transition identifier it has used for communication until it has confirmed that the Backend has acknowledged its implicit or explicit retirement. The second level in the hierarchy is the Frontend Generation number. It is used to distiguish Frontend instances on the same member across Frontend restarts. We could use Akka Persistence or similar to persist Frontend state, all of the state is inherently transitive and high-churn, so persisting it would lead to inferior degradation. Every time a Frontend instance is started it is assigned a unique, generation number. There are multiple mechanisms how such a number could be obtained, such as UUID generation, but it is advantageous to have this number increasing monotonically. For this reason we will use a locally-persisted number, which is incremented (and persisted once) everytime a Frontend instance is started. We will use an unsigned 64bit number, stored as a simple long, but treated as an unsigned by using Long.*Unsigned*() methods to interact with it. These two levels are combined into a FrontendIdentifier and embedded in every message the Frontend sends to the Backend. The third level in the hierarchy is Local History identifier, which identifies a particular Local History managed by the Frontend. This is a 64bit unsigned integer, which is unique for a particular Frontend Generation. It is monotonically increasing from 1. Zero is treated as a 'no Local History' marker and is made special only to support free-standing transactions without the need to define a set of dedicated messages. The fourth level is the transaction number. This 64bit number uniquely identifies a particular transaction within a Local History. It is also monotonically incremented for each transaction created in the Local History. These two levels are combined into a LocalTransactionIdentifier, which uniquely identifies any transaction originating from a Frontend instance. When combined with FrontendIdentifier, it also uniquely identifies any transaction that has ever executed in the cluster, e.g. forming a GlobalTransactionIdentifier. == Persistence model === Frontend persistence As noted above, implementation of the Frontend Generation requires each member to persistently store a single 64 bit counter. To keep the persistence interfaces within a member consistent, Frontend uses Akka Persistence to maintain this counter across restarts. State for running transactions is kept in the Frontend until the backend confirms it being completed. If an inconsistency occurs (due to messages getting lost or Shard Leader failing), the Frontend will retransmit this state to the Backend. === Backend persistence The Backend needs to persist and replicate enough state to ensure that Shard Leader movement does not render Local Histories inoperable, as that has a direct impact on applications, which need to have a state reconciliation strategy to deal with this situation. Development of complex strategies cannot be expected from most applications, as their developers will implement a simple strategy of performing a full state dump into the data store, for example by flapping a BGP session. If the Shard Leader movement was caused by the leader experiencing performance problems, such a dump could result in the new Shard Leader failing for the same reason, which would lead to the application not being able to make forward progress. In order to achieve this, the Backend needs to persist both the data stored by the application and enough metadata about Local Histories and Transactions to make state recovery across shard leadership movement possible. ==== User Data persistence The Snapshot needs to identify all transactions which have been integrated into it. Enumerating all of them is clearly not practical, hence we exploit the monotonic nature of identifiers' relationship to Global History. For each valid Local History, we only need to store the last transaction number, as it implies all previous transactions from that Local History have been accepted into Global History. This does not include any Local Histories which have been retired by the Frontend, as any attempt to access their state would mean use-after-free on Frontend's part. The same holds true of any metadata about a Frontend generation -- as soon as the Shard Leader receives a message from a newer generation, it can safely discard any metadata mentioning the prior generation and consider all its identifiers as retired. Each DataTreeCandidatePayload entry needs to contain the GlobalTransactionIdentifier of the transaction which has caused the delta carried in that entry. This information is used to summarize transaction metadata when this entry is snapshotted. ==== Internal state persistence The Backend creates a hierarchy of actors to track interactions with the Frontend. Information necessary to recreate these actors needs to be persisted within the journal. This boils down to existence of Local Histories and the last transaction committed for a particular history. Whenever a Local History is created on the Frontend, a corresponding actor is created on the Shard Leader. Before the leader acknowledges its creation, it needs to persist a state transition, identifying the Local History. Whenever a Local History is closed down, a corresponding tombstone is stored in the journal. The last transaction identifier for a local history is tracked via snapshots and DataTreeCandidatePayloads. In order to minimize overhead on Followers, the actor hierarchy is not created until the actor transitions to Shard Leader state. It only tracks the information required. == Actor model Both Backend and Frontend are implemented as a set of cooperating actors. This section outlines the structure and how it relates to user application interactions. === Frontend actor model The current implementation does not have explicit actors, which means that every interaction with the Backend is done via Patterns.ask(), which creates implicit actors. This model proposes to change that, making the model explicit -- allowing us to better track state of interactions. ==== FrontendActor FrontendActor acts as the single, root-level actor. It is tied to a ConcurrentDataBroker instance. Its resposibility is to maintain the generation number (hence it is a PersistentActor), maintain information about individual ShardLeader locations and direct this information to its children. It maintains a set of LocalHistoryActors and a set of SingleTransactionActors. Its responsibility is to talk to ShardManager and propagate information about Shard Leader location towards its child actors. ==== LocalHistoryActor A LocalHistoryActor is tied to a DOMTransactionChain, maintains all local state to it and also tracks its propagation to the Shard Leader. This is not a persistent actor. It maintains a relationship to its Backend counterpart and routes operations (remote) and transactions (local) towards it. Requests sent from the frontend to the local history actor are subjects to timeouts. If a timeout occurs, the frontend is expected to perform full backend leader discovery, contact the leader and reconcile transaction state with it. In order to minimize messaging latency while maintaining resiliency to message loss (due to TCP connectivity issues), there are two messaging patterns: -- Frontend talks to the Shard. This is used during instantiation of a local history, as well as state reconciliation after a retriable failure has occurred (such as a request timeout). -- Frontend talks to the Local History Actor. This is used during transaction operation, e.g. when a transaction is being built up. Once the transaction is readied, the LHA will send a message to the Shard, which takes ownership of that transaction's state. Since this handoff is not observable by the frontend (which would require additional round-trip), the Shard replicates its responses to the LHA. In case a timeout occurs, the frontend will contact the LHA (as that's the last endpoint it knows about), which will replay the Shard message. Messaging between a Local History Actor and its associated Shard is expected to be reliable. This is guaranteed by the parent/child relationship, where Shard is the parent. If a Shard fails, all of its children will be restarted, losing all state. If an LHA fails, Shard will be notified with Akka DeathWatch pattern. ==== SingleTransactionActor A SingleTransactionActor takes care of transactions which are not tied to a LocalHistory, e.g. single transactions instatiated via DOMDataBroker.newXXXTransaction()). These actors instantiated for each transaction separately and get terminated once the transaction is completed, successfully or not.