+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = requireNonNull(newSchemaContext);
+ this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+ }
+
+ void resetTransactionBatch() {
+ currentTransactionBatch = 0;
+ }
+
+ /**
+ * Take a snapshot of current state for later recovery.
+ *
+ * @return A state snapshot
+ */
+ @NonNull ShardDataTreeSnapshot takeStateSnapshot() {
+ final NormalizedNode<?, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+ final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
+ ImmutableMap.builder();
+
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ final ShardDataTreeSnapshotMetadata<?> meta = m.toSnapshot();
+ if (meta != null) {
+ metaBuilder.put(meta.getType(), meta);
+ }
+ }
+
+ return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
+ }
+
+ private boolean anyPendingTransactions() {
+ return !pendingTransactions.isEmpty() || !pendingCommits.isEmpty() || !pendingFinishCommits.isEmpty();
+ }
+
+ private void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot,
+ final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
+ final Stopwatch elapsed = Stopwatch.createStarted();
+
+ if (anyPendingTransactions()) {
+ LOG.warn("{}: applying state snapshot with pending transactions", logContext);
+ }
+
+ final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> snapshotMeta;
+ if (snapshot instanceof MetadataShardDataTreeSnapshot) {
+ snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+ } else {
+ snapshotMeta = ImmutableMap.of();
+ }
+
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ final ShardDataTreeSnapshotMetadata<?> s = snapshotMeta.get(m.getSupportedType());
+ if (s != null) {
+ m.applySnapshot(s);
+ } else {
+ m.reset();
+ }
+ }
+
+ final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final DataTreeModification mod = wrapper.apply(unwrapped);
+ // delete everything first
+ mod.delete(YangInstanceIdentifier.empty());
+
+ final Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+ if (maybeNode.isPresent()) {
+ // Add everything from the remote node back
+ mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
+ }
+ mod.ready();
+
+ dataTree.validate(unwrapped);
+ DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+
+ LOG.debug("{}: state snapshot applied in {}", logContext, elapsed);
+ }
+
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ // TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
+ /**
+ * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ if (snapshot.needsMigration()) {
+ final ReusableNormalizedNodePruner uintPruner = pruner.withUintAdaption();
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Proactive(delegate, dataTree, uintPruner));
+ } else {
+ applySnapshot(snapshot.getSnapshot(),
+ delegate -> new PruningDataTreeModification.Reactive(delegate, dataTree, pruner));
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
+ final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
+ final PruningDataTreeModification mod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
+
+ DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
+ mod.ready();
+ LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+ try {
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-payload-" + logContext + ".out");
+ DataTreeModificationOutput.toFile(file, unwrapped);
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s",
+ logContext, file), e);
+ }
+
+ allMetadataCommittedTransaction(entry.getKey());
+ }
+
+ private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
+ final boolean uintAdapting) {
+ // TODO: we should be able to reuse the pruner, provided we are not reentrant
+ final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
+ dataSchemaContext);
+ return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
+ : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
+ }
+
+ /**
+ * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
+ if (payload instanceof CommitTransactionPayload) {
+ applyRecoveryCandidate((CommitTransactionPayload) payload);
+ } else if (payload instanceof AbortTransactionPayload) {
+ allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeTransactionPayload) {
+ allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+ } else if (payload instanceof CreateLocalHistoryPayload) {
+ allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeLocalHistoryPayload) {
+ allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else {
+ LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
+ }
+ }
+
+ private void applyReplicatedCandidate(final CommitTransactionPayload payload)
+ throws DataValidationFailedException, IOException {
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
+ final TransactionIdentifier identifier = entry.getKey();
+ LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+ final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ // TODO: check version here, which will enable us to perform forward-compatibility transformations
+ DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
+ mod.ready();
+
+ LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
+
+ allMetadataCommittedTransaction(identifier);
+ notifyListeners(candidate);
+ }
+
+ /**
+ * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+ * SchemaContexts match and does not perform any pruning.
+ *
+ * @param identifier Payload identifier as returned from RaftActor
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ DataValidationFailedException {
+ /*
+ * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+ * if we are the leader and it has originated with us.
+ *
+ * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+ * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+ * acting in that capacity anymore.
+ *
+ * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+ * pre-Boron state -- which limits the number of options here.
+ */
+ if (payload instanceof CommitTransactionPayload) {
+ if (identifier == null) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ } else {
+ verify(identifier instanceof TransactionIdentifier);
+ // if we did not track this transaction before, it means that it came from another leader and we are in
+ // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+ // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+ if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ }
+ }
+ } else if (payload instanceof AbortTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((AbortTransactionPayload) payload);
+ }
+ allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeTransactionPayload) payload);
+ }
+ allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+ }
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof CreateLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+ }
+ allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof PurgeLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+ }
+ allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else {
+ LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+ }
+ }
+
+ private void replicatePayload(final Identifier id, final Payload payload, final @Nullable Runnable callback) {
+ if (callback != null) {
+ replicationCallbacks.put(payload, callback);
+ }
+ shard.persistPayload(id, payload, true);
+ }
+
+ private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+ final Runnable callback = replicationCallbacks.remove(payload);
+ if (callback != null) {
+ LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+ callback.run();
+ } else {
+ LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+ }
+ }
+
+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingFinishCommits.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
+ return false;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
+ return false;
+ }
+
+ finishCommit(current.cohort);
+ return true;
+ }
+
+ private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionAborted(txId);
+ }
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionCommitted(txId);
+ }
+ }
+
+ private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionPurged(txId);
+ }
+ }
+
+ private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryCreated(historyId);
+ }
+ }
+
+ private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryClosed(historyId);
+ }
+ }
+
+ private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryPurged(historyId);
+ }
+ }
+
+ /**
+ * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+ * this method is used for re-establishing state when we are taking over
+ *
+ * @param historyId Local history identifier
+ * @param closed True if the chain should be created in closed state (i.e. pending purge)
+ * @return Transaction chain handle
+ */
+ ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final boolean closed) {
+ final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+ final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+ checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId, existing);
+ return ret;