import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
* e.g. it does not expose public interfaces and assumes it is only ever called from a
* single thread.
*
+ * <p>
* This class is not part of the API contract and is subject to change at any time.
*/
@NotThreadSafe
new DefaultShardDataChangeListenerPublisher(), "");
}
- String logContext() {
+ final String logContext() {
return logContext;
}
+ final Ticker ticker() {
+ return shard.ticker();
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
return schemaContext;
}
- void updateSchemaContext(final SchemaContext schemaContext) {
- dataTree.setSchemaContext(schemaContext);
- this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
/**
return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+ private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
final Stopwatch elapsed = Stopwatch.createStarted();
final DataTreeModification unwrapped = unwrap(mod);
dataTree.validate(unwrapped);
- dataTree.commit(dataTree.prepare(unwrapped));
+ DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+
LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
}
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
return new PruningDataTreeModification(delegate, dataTree, schemaContext);
}
applySnapshot(snapshot, this::wrapWithPruning);
}
-
- /**
- * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
- * does not perform any pruning.
- *
- * @param snapshot Snapshot that needs to be applied
- * @throws DataValidationFailedException when the snapshot fails to apply
- */
- void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, UnaryOperator.identity());
- }
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
*/
void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else if (payload instanceof DataTreeCandidatePayload) {
applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
} else {
- LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+ LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
}
*/
if (payload instanceof CommitTransactionPayload) {
if (identifier == null) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyReplicatedCandidate(e.getKey(), e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else {
}
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
final DataChangeScope scope) {
- final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
private Optional<DataTreeCandidate> readCurrentData() {
- final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ final Optional<NormalizedNode<?, ?>> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
- public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
- final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
- path, listener);
+ public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getId(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
}
/**
+ * Commits a modification.
+ *
* @deprecated This method violates DataTree containment and will be removed.
*/
@VisibleForTesting
public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
- for(CommitEntry entry: pendingTransactions) {
+ for (CommitEntry entry: pendingTransactions) {
ret.add(entry.cohort);
}
return ret;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void processNextTransaction() {
while (!pendingTransactions.isEmpty()) {
final CommitEntry entry = pendingTransactions.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ if (cohort.getState() != State.CAN_COMMIT_PENDING) {
break;
}
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation.", e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
final TransactionIdentifier txId = cohort.getIdentifier();
final DataTreeCandidate candidate = cohort.getCandidate();
cohortRegistry.process(sender, message);
}
+ @Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
return cohort;
}
+ @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"},
+ justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
break;
case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
break;
case PRE_COMMIT_PENDING:
case FAILED:
case READY:
default:
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we just want to drop the current entry that expired and thus ignore the return value.
+ // In fact we really shouldn't hit this case but we handle all enums for completeness.
pendingTransactions.poll();
}
}
}
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "See inline comment below.")
void startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = pendingTransactions.iterator();
if (!it.hasNext()) {
if (cohort.getState() != State.COMMIT_PENDING) {
LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
+
+ // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
+ // this case, we've already obtained the head of the queue above via the Iterator and we just want to
+ // remove it here.
pendingTransactions.poll();
processNextTransaction();
} else {
}
private void maybeRunOperationOnPendingTransactionsComplete() {
- if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
- LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
- runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
- }
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}