+ final java.util.Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+ if (maybeNode.isPresent()) {
+ // Add everything from the remote node back
+ mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+ }
+ mod.ready();
+
+ final DataTreeModification unwrapped = unwrap(mod);
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+ }
+
+ private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
+ return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+ }
+
+ private static DataTreeModification unwrap(final DataTreeModification modification) {
+ if (modification instanceof PruningDataTreeModification) {
+ return ((PruningDataTreeModification)modification).delegate();
+ }
+ return modification;
+ }
+
+ /**
+ * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, this::wrapWithPruning);
+ }
+
+
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
+ private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+ final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
+ DataTreeCandidates.applyToModification(mod, candidate);
+ mod.ready();
+
+ final DataTreeModification unwrapped = mod.delegate();
+ LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+ try {
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-payload-" + logContext + ".out");
+ DataTreeModificationOutput.toFile(file, unwrapped);
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s",
+ logContext, file), e);
+ }
+ }
+
+ /**
+ * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+ if (payload instanceof CommitTransactionPayload) {
+ final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ applyRecoveryCandidate(e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else if (payload instanceof DataTreeCandidatePayload) {
+ applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
+ } else {
+ LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+ }
+ }
+
+ private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ throws DataValidationFailedException {
+ LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+ final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ DataTreeCandidates.applyToModification(mod, foreign);
+ mod.ready();
+
+ LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
+
+ notifyListeners(candidate);
+ }
+
+ /**
+ * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+ * SchemaContexts match and does not perform any pruning.
+ *
+ * @param identifier Payload identifier as returned from RaftActor
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ DataValidationFailedException {
+ /*
+ * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+ * if we are the leader and it has originated with us.
+ *
+ * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+ * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+ * acting in that capacity anymore.
+ *
+ * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+ * pre-Boron state -- which limits the number of options here.
+ */
+ if (payload instanceof CommitTransactionPayload) {
+ if (identifier == null) {
+ final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ applyReplicatedCandidate(e.getKey(), e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else {
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
+ }
+ } else {
+ LOG.debug("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+ }
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingTransactions.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.transactionCommitted(txId);
+ }