+ /**
+ * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+ * SchemaContexts match and does not perform any pruning.
+ *
+ * @param identifier Payload identifier as returned from RaftActor
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ DataValidationFailedException {
+ /*
+ * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+ * if we are the leader and it has originated with us.
+ *
+ * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+ * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+ * acting in that capacity anymore.
+ *
+ * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+ * pre-Boron state -- which limits the number of options here.
+ */
+ if (payload instanceof CommitTransactionPayload) {
+ if (identifier == null) {
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
+ applyReplicatedCandidate(e.getKey(), e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else {
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
+ }
+ } else if (payload instanceof AbortTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((AbortTransactionPayload) payload);
+ } else {
+ allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof PurgeTransactionPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeTransactionPayload) payload);
+ } else {
+ allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof CloseLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+ } else {
+ allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof CreateLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+ } else {
+ allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
+ }
+ } else if (payload instanceof PurgeLocalHistoryPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+ } else {
+ allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ }
+ } else {
+ LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+ }
+ }
+
+ private void replicatePayload(final Identifier id, final Payload payload, @Nullable final Runnable callback) {
+ if (callback != null) {
+ replicationCallbacks.put(payload, callback);
+ }
+ shard.persistPayload(id, payload, true);
+ }
+
+ private void payloadReplicationComplete(final AbstractIdentifiablePayload<?> payload) {
+ final Runnable callback = replicationCallbacks.remove(payload);
+ if (callback != null) {
+ LOG.debug("{}: replication of {} completed, invoking {}", logContext, payload.getIdentifier(), callback);
+ callback.run();
+ } else {
+ LOG.debug("{}: replication of {} has no callback", logContext, payload.getIdentifier());
+ }
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingFinishCommits.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionAborted(txId);
+ }
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionCommitted(txId);
+ }
+ }
+
+ private void allMetadataPurgedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionPurged(txId);
+ }
+ }
+
+ private void allMetadataCreatedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryCreated(historyId);
+ }
+ }
+
+ private void allMetadataClosedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryClosed(historyId);
+ }
+ }
+
+ private void allMetadataPurgedLocalHistory(final LocalHistoryIdentifier historyId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onHistoryPurged(historyId);
+ }
+ }
+
+ /**
+ * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
+ * this method is used for re-establishing state when we are taking over
+ *
+ * @param historyId Local history identifier
+ * @param closed True if the chain should be created in closed state (i.e. pending purge)
+ * @return Transaction chain handle
+ */
+ ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final boolean closed) {
+ final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
+ final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
+ Preconditions.checkState(existing == null, "Attempted to recreate chain %s, but %s already exists", historyId,
+ existing);
+ return ret;
+ }
+
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+ ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
+ if (chain == null) {
+ chain = new ShardDataTreeTransactionChain(historyId, this);
+ transactionChains.put(historyId, chain);
+ shard.persistPayload(historyId, CreateLocalHistoryPayload.create(historyId), true);
+ }
+
+ return chain;
+ }
+
+ ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
+ }
+
+ return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+ }
+
+ ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
+ .newModification());
+ }
+
+ return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+ }
+
+ @VisibleForTesting
+ public void notifyListeners(final DataTreeCandidate candidate) {
+ treeChangeListenerPublisher.publishChanges(candidate, logContext);
+ dataChangeListenerPublisher.publishChanges(candidate, logContext);
+ }
+
+ void notifyOfInitialData(final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> listenerReg, final Optional<DataTreeCandidate> currentState) {
+ if (currentState.isPresent()) {
+ ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
+ localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
+ listenerReg.getScope());
+ localPublisher.publishChanges(currentState.get(), logContext);
+ }
+ }
+
+ void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final Optional<DataTreeCandidate> currentState) {
+ if (currentState.isPresent()) {
+ ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
+ localPublisher.registerTreeChangeListener(path, listener);
+ localPublisher.publishChanges(currentState.get(), logContext);
+ }
+ }
+
+ /**
+ * Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
+ * replication callbacks.
+ */
+ void purgeLeaderState() {
+ for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
+ chain.close();
+ }
+
+ transactionChains.clear();
+ replicationCallbacks.clear();
+ }
+
+ /**
+ * Close a single transaction chain.
+ *
+ * @param id History identifier
+ * @param callback Callback to invoke upon completion, may be null
+ */
+ void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ chain.close();
+ replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ }
+
+ /**
+ * Purge a single transaction chain.
+ *
+ * @param id History identifier
+ * @param callback Callback to invoke upon completion, may be null
+ */
+ void purgeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
+ if (chain == null) {
+ LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ }
+
+ Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
+ final DataChangeScope scope) {
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
+
+ return new SimpleEntry<>(reg, readCurrentData());
+ }
+
+ private Optional<DataTreeCandidate> readCurrentData() {
+ final Optional<NormalizedNode<?, ?>> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
+ YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
+ }
+
+ public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
+
+ return new SimpleEntry<>(reg, readCurrentData());
+ }
+
+ int getQueueSize() {
+ return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
+ }
+