From 6276a65120a674b545ea787a5e1d9311bcdbf2af Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 1 Aug 2016 23:20:11 +0200 Subject: [PATCH] BUG-5280: persist metadata in snaphots This patch adds the wiring in ShardDataTree to persist various pieces of metadata in a snapshot. It also includes metadata recovery from a snapshot. In order to make this work, this patch centralizes all actual payload and snapshot handling within the ShardDataTree by introducing explicit entrypoints for each avenue through which data can be introduced. Change-Id: Ibc15bd152bd44dd583d67bb7fc61bc8f3086df30 Signed-off-by: Robert Varga --- .../datastore/DataTreeCandidatePayload.java | 14 +- .../controller/cluster/datastore/Shard.java | 20 +- .../cluster/datastore/ShardDataTree.java | 287 +++++++++++++----- .../datastore/ShardDataTreeMetadata.java | 33 ++ .../datastore/ShardRecoveryCoordinator.java | 98 ++---- .../datastore/ShardSnapshotCohort.java | 46 +-- .../datastore/SimpleShardDataTreeCohort.java | 12 +- .../persisted/CommitTransactionPayload.java | 8 +- .../persisted/DataTreeCandidateSupplier.java | 25 -- .../MetadataShardDataTreeSnapshot.java | 23 +- .../ShardDataTreeSnapshotMetadata.java | 4 +- .../utils/PruningDataTreeModification.java | 19 +- .../DataTreeCandidatePayloadTest.java | 12 +- .../cluster/datastore/ShardTest.java | 51 ++-- .../persisted/ShardDataTreeSnapshotTest.java | 10 +- .../PruningDataTreeModificationTest.java | 2 +- 16 files changed, 379 insertions(+), 285 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java index 25a7ee8622..a971ee6ad5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java @@ -14,12 +14,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; -import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -27,7 +22,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; * @deprecated Deprecated in Boron in favor of CommitTransactionPayload */ @Deprecated -final class DataTreeCandidatePayload extends Payload implements DataTreeCandidateSupplier, Externalizable { +final class DataTreeCandidatePayload extends Payload implements Externalizable { private static final long serialVersionUID = 1L; private transient byte[] serialized; @@ -55,11 +50,8 @@ final class DataTreeCandidatePayload extends Payload implements DataTreeCandidat return new DataTreeCandidatePayload(out.toByteArray()); } - - @Override - public Entry, DataTreeCandidate> getCandidate() throws IOException { - return new SimpleImmutableEntry<>(Optional.empty(), - DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized))); + public DataTreeCandidate getCandidate() throws IOException { + return DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 3fe349f798..fbd7c89b6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -569,21 +568,14 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { - if (data instanceof DataTreeCandidateSupplier) { - if (clientActor == null) { - // No clientActor indicates a replica coming from the leader - try { - store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data); - } catch (DataValidationFailedException | IOException e) { - LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); - } - } else { - // Replication consensus reached, proceed to commit - store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data); + if (data instanceof Payload) { + try { + store.applyReplicatedPayload(identifier, (Payload)data); + } catch (DataValidationFailedException | IOException e) { + LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); } } else { - LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data, - data.getClass().getClassLoader()); + LOG.error("{}: Unknown state for {} received {}", persistenceId(), identifier, data); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 89fa8fbc25..f1d37872fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -12,7 +12,11 @@ import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.primitives.UnsignedLong; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; @@ -27,15 +31,18 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -90,6 +97,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { private final Queue pendingTransactions = new ArrayDeque<>(); private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; + private final Collection> metadata; private final TipProducingDataTree dataTree; private final String logContext; private final Shard shard; @@ -99,14 +107,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree, final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, - final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { - this.dataTree = dataTree; + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext, + final ShardDataTreeMetadata... metadata) { + this.dataTree = Preconditions.checkNotNull(dataTree); updateSchemaContext(schemaContext); this.shard = Preconditions.checkNotNull(shard); this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher); this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher); this.logContext = Preconditions.checkNotNull(logContext); + this.metadata = ImmutableList.copyOf(metadata); } public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType, @@ -139,18 +149,205 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { this.schemaContext = Preconditions.checkNotNull(schemaContext); } - ShardDataTreeSnapshot takeRecoverySnapshot() { - return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get()); + /** + * Take a snapshot of current state for later recovery. + * + * @return A state snapshot + */ + @Nonnull ShardDataTreeSnapshot takeStateSnapshot() { + final NormalizedNode rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get(); + final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = + ImmutableMap.builder(); + + for (ShardDataTreeMetadata m : metadata) { + final ShardDataTreeSnapshotMetadata meta = m.toStapshot(); + if (meta != null) { + metaBuilder.put(meta.getType(), meta); + } + } + + return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build()); } - void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException { - // FIXME: purge any outstanding transactions + private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot, + final UnaryOperator wrapper) throws DataValidationFailedException { + final Stopwatch elapsed = Stopwatch.createStarted(); - final DataTreeModification snapshot = transaction.getSnapshot(); - snapshot.ready(); + if (!pendingTransactions.isEmpty()) { + LOG.warn("{}: applying state snapshot with pending transactions", logContext); + } + + final Map>, ShardDataTreeSnapshotMetadata> snapshotMeta; + if (snapshot instanceof MetadataShardDataTreeSnapshot) { + snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata(); + } else { + snapshotMeta = ImmutableMap.of(); + } + + for (ShardDataTreeMetadata m : metadata) { + final ShardDataTreeSnapshotMetadata s = snapshotMeta.get(m.getSupportedType()); + if (s != null) { + m.applySnapshot(s); + } else { + m.reset(); + } + } + + final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification()); + // delete everything first + mod.delete(YangInstanceIdentifier.EMPTY); + + final java.util.Optional> maybeNode = snapshot.getRootNode(); + if (maybeNode.isPresent()) { + // Add everything from the remote node back + mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get()); + } + mod.ready(); + + final DataTreeModification unwrapped = unwrap(mod); + dataTree.validate(unwrapped); + dataTree.commit(dataTree.prepare(unwrapped)); + LOG.debug("{}: state snapshot applied in %s", logContext, elapsed); + } + + private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) { + return new PruningDataTreeModification(delegate, dataTree, schemaContext); + } + + private static DataTreeModification unwrap(final DataTreeModification modification) { + if (modification instanceof PruningDataTreeModification) { + return ((PruningDataTreeModification)modification).delegate(); + } + return modification; + } + + /** + * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data + * pruning in an attempt to adjust the state to our current SchemaContext. + * + * @param snapshot Snapshot that needs to be applied + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + applySnapshot(snapshot, this::wrapWithPruning); + } - dataTree.validate(snapshot); - dataTree.commit(dataTree.prepare(snapshot)); + + /** + * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and + * does not perform any pruning. + * + * @param snapshot Snapshot that needs to be applied + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException { + applySnapshot(snapshot, UnaryOperator.identity()); + } + + private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException { + final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification()); + DataTreeCandidates.applyToModification(mod, candidate); + mod.ready(); + + final DataTreeModification unwrapped = mod.delegate(); + LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped); + + dataTree.validate(unwrapped); + dataTree.commit(dataTree.prepare(unwrapped)); + } + + /** + * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data + * pruning in an attempt to adjust the state to our current SchemaContext. + * + * @param payload Payload + * @throws IOException when the snapshot fails to deserialize + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException { + if (payload instanceof CommitTransactionPayload) { + final Entry e = ((CommitTransactionPayload) payload).getCandidate(); + applyRecoveryCandidate(e.getValue()); + allMetadataCommittedTransaction(e.getKey()); + } else if (payload instanceof DataTreeCandidatePayload) { + applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate()); + } else { + LOG.warn("{}: ignoring unhandled payload {}", logContext, payload); + } + } + + private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign) + throws DataValidationFailedException { + LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); + + final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + DataTreeCandidates.applyToModification(mod, foreign); + mod.ready(); + + LOG.trace("{}: Applying foreign modification {}", logContext, mod); + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); + + notifyListeners(candidate); + } + + /** + * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower + * SchemaContexts match and does not perform any pruning. + * + * @param identifier Payload identifier as returned from RaftActor + * @param payload Payload + * @throws IOException when the snapshot fails to deserialize + * @throws DataValidationFailedException when the snapshot fails to apply + */ + void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException, + DataValidationFailedException { + /* + * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload + * if we are the leader and it has originated with us. + * + * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately, + * though, this may not be the case anymore, as we are being called some time afterwards and we may not be + * acting in that capacity anymore. + * + * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe + * pre-Boron state -- which limits the number of options here. + */ + if (payload instanceof CommitTransactionPayload) { + if (identifier == null) { + final Entry e = ((CommitTransactionPayload) payload).getCandidate(); + applyReplicatedCandidate(e.getKey(), e.getValue()); + allMetadataCommittedTransaction(e.getKey()); + } else { + Verify.verify(identifier instanceof TransactionIdentifier); + payloadReplicationComplete((TransactionIdentifier) identifier); + } + } else { + LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload); + } + } + + private void payloadReplicationComplete(final TransactionIdentifier txId) { + final CommitEntry current = pendingTransactions.peek(); + if (current == null) { + LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); + return; + } + + if (!current.cohort.getIdentifier().equals(txId)) { + LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, + current.cohort.getIdentifier(), txId); + return; + } + + finishCommit(current.cohort); + } + + private void allMetadataCommittedTransaction(final TransactionIdentifier txId) { + for (ShardDataTreeMetadata m : metadata) { + m.transactionCommitted(txId); + } } private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { @@ -249,20 +446,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return pendingTransactions.size(); } - void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { - LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); - - final DataTreeModification mod = dataTree.takeSnapshot().newModification(); - DataTreeCandidates.applyToModification(mod, foreign); - mod.ready(); - - LOG.trace("{}: Applying foreign modification {}", logContext, mod); - dataTree.validate(mod); - final DataTreeCandidate candidate = dataTree.prepare(mod); - dataTree.commit(candidate); - notifyListeners(candidate); - } - @Override void abortTransaction(final AbstractShardDataTreeTransaction transaction) { // Intentional no-op @@ -288,12 +471,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return dataTree.takeSnapshot().newModification(); } + /** + * @deprecated This method violates DataTree containment and will be removed. + */ @VisibleForTesting - // FIXME: This should be removed, it violates encapsulation + @Deprecated public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException { modification.ready(); dataTree.validate(modification); - DataTreeCandidateTip candidate = dataTree.prepare(modification); + DataTreeCandidate candidate = dataTree.prepare(modification); dataTree.commit(candidate); return candidate; } @@ -404,24 +590,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); try { - try { - dataTree.commit(candidate); - } catch (IllegalStateException e) { - // We may get a "store tree and candidate base differ" IllegalStateException from commit under - // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last - // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before - // applying it to the state. We then become the leader and a second tx is pre-committed and - // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign - // candidate via applyState prior to the second tx. Since the second tx has already been - // pre-committed, when it gets here to commit it will get an IllegalStateException. - - // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner - // solution will be forthcoming. - - LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e); - applyForeignCandidate(txId, candidate); - } + dataTree.commit(candidate); } catch (Exception e) { + LOG.error("{}: Failed to commit transaction {}", logContext, txId, e); failCommit(e); return; } @@ -430,7 +601,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis()); // FIXME: propagate journal index - pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO); LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId); @@ -468,28 +638,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId); } - private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) { - final CommitEntry current = pendingTransactions.peek(); - if (current == null) { - LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); - return; - } - - if (!current.cohort.getIdentifier().equals(txId)) { - LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext, - current.cohort.getIdentifier(), txId); - return; - } - - finishCommit(current.cohort); - } - - void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) { - // For now we do not care about anything else but transactions - Verify.verify(identifier instanceof TransactionIdentifier); - payloadReplicationComplete((TransactionIdentifier)identifier, payload); - } - void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) { cohortRegistry.process(sender, message); } @@ -502,11 +650,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return cohort; } - void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload) - throws DataValidationFailedException, IOException { - applyForeignCandidate(identifier, payload.getCandidate().getValue()); - } - void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) { final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis); final long now = shard.ticker().read(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java new file mode 100644 index 0000000000..8a62a2bae0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Verify; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; + +abstract class ShardDataTreeMetadata> { + final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata snapshot) { + Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot, + getSupportedType()); + doApplySnapshot(getSupportedType().cast(snapshot)); + } + + abstract void reset(); + + abstract void doApplySnapshot(@Nonnull T snapshot); + + abstract @Nonnull Class getSupportedType(); + + abstract @Nullable T toStapshot(); + + // Lifecycle events + abstract void transactionCommitted(TransactionIdentifier txId); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index c533759193..70a701075e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -10,22 +10,11 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.io.File; -import java.io.IOException; -import java.util.Map.Entry; -import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; -import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput; -import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; /** @@ -41,71 +30,51 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private final ShardDataTree store; private final String shardName; private final Logger log; - private PruningDataTreeModification transaction; - private int size; private final byte[] restoreFromSnapshot; - ShardRecoveryCoordinator(ShardDataTree store, byte[] restoreFromSnapshot, String shardName, Logger log) { + private boolean open; + + ShardRecoveryCoordinator(final ShardDataTree store, final byte[] restoreFromSnapshot, final String shardName, final Logger log) { this.store = Preconditions.checkNotNull(store); - this.restoreFromSnapshot = restoreFromSnapshot; this.shardName = Preconditions.checkNotNull(shardName); this.log = Preconditions.checkNotNull(log); + + this.restoreFromSnapshot = restoreFromSnapshot; } @Override - public void startLogRecoveryBatch(int maxBatchSize) { + public void startLogRecoveryBatch(final int maxBatchSize) { log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); - transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(), - store.getSchemaContext()); - size = 0; + open = true; } @Override - public void appendRecoveredLogEntry(Payload payload) { - Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry"); + public void appendRecoveredLogEntry(final Payload payload) { + Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry"); try { - if (payload instanceof DataTreeCandidateSupplier) { - final Entry, DataTreeCandidate> e = - ((DataTreeCandidateSupplier)payload).getCandidate(); - - DataTreeCandidates.applyToModification(transaction, e.getValue()); - size++; - - if (e.getKey().isPresent()) { - // FIXME: BUG-5280: propagate transaction state - } - } else { - log.error("{}: Unknown payload {} received during recovery", shardName, payload); - } - } catch (IOException e) { - log.error("{}: Error extracting payload", shardName, e); + store.applyRecoveryPayload(payload); + } catch (Exception e) { + log.error("{}: failed to apply payload {}", shardName, payload, e); + throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s", + shardName, payload), e); } } - private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException { - store.commit(tx.getResultingModification()); - } - /** * Applies the current batched log entries to the data store. */ @Override public void applyCurrentLogRecoveryBatch() { - Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch"); + Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch"); + open = false; + } - log.debug("{}: Applying current log recovery batch with size {}", shardName, size); - try { - commitTransaction(transaction); - } catch (Exception e) { - File file = new File(System.getProperty("karaf.data", "."), - "failed-recovery-batch-" + shardName + ".out"); - DataTreeModificationOutput.toFile(file, transaction.getResultingModification()); - throw new RuntimeException(String.format( - "%s: Failed to apply recovery batch. Modification data was written to file %s", - shardName, file), e); - } - transaction = null; + private File writeRoot(final String kind, final NormalizedNode node) { + final File file = new File(System.getProperty("karaf.data", "."), + "failed-" + kind + "-snapshot-" + shardName + ".xml"); + NormalizedNodeXMLOutput.toFile(file, node); + return file; } /** @@ -120,26 +89,19 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { final ShardDataTreeSnapshot snapshot; try { snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); - } catch (IOException e) { - log.error("{}: failed to deserialize snapshot", e); + } catch (Exception e) { + log.error("{}: failed to deserialize snapshot", shardName, e); throw Throwables.propagate(e); } - final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(), - store.getDataTree(), store.getSchemaContext()); - - final NormalizedNode node = snapshot.getRootNode().orElse(null); - tx.write(YangInstanceIdentifier.EMPTY, node); - try { - commitTransaction(tx); + store.applyRecoverySnapshot(snapshot); } catch (Exception e) { - File file = new File(System.getProperty("karaf.data", "."), - "failed-recovery-snapshot-" + shardName + ".xml"); - NormalizedNodeXMLOutput.toFile(file, node); - throw new RuntimeException(String.format( - "%s: Failed to apply recovery snapshot. Node data was written to file %s", - shardName, file), e); + log.error("{}: failed to apply snapshot {}", shardName, snapshot, e); + + final File f = writeRoot("recovery", snapshot.getRootNode().orElse(null)); + throw new IllegalStateException(String.format( + "%s: Failed to apply recovery snapshot. Node data was written to file %s", shardName, f), e); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 8bef15bbba..adf60a0c21 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -11,18 +11,14 @@ import akka.actor.ActorContext; import akka.actor.ActorRef; import com.google.common.base.Preconditions; import java.io.IOException; -import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; /** @@ -33,17 +29,13 @@ import org.slf4j.Logger; class ShardSnapshotCohort implements RaftActorSnapshotCohort { private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); - private final LocalHistoryIdentifier applyHistoryId; private final ActorRef snapshotActor; private final ShardDataTree store; private final String logId; private final Logger log; - private long applyCounter; - private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) { - this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId); this.snapshotActor = Preconditions.checkNotNull(snapshotActor); this.store = Preconditions.checkNotNull(store); this.log = log; @@ -66,10 +58,17 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { @Override public void createSnapshot(final ActorRef actorRef) { // Forward the request to the snapshot actor - ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef); + ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef); } - private void deserializeAndApplySnapshot(final byte[] snapshotBytes) { + @Override + public void applySnapshot(final byte[] snapshotBytes) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + + log.info("{}: Applying snapshot", logId); + final ShardDataTreeSnapshot snapshot; try { snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes); @@ -79,33 +78,12 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { } try { - final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction( - new TransactionIdentifier(applyHistoryId, applyCounter++)); - - // delete everything first - transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY); - - final Optional> maybeNode = snapshot.getRootNode(); - if (maybeNode.isPresent()) { - // Add everything from the remote node back - transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get()); - } - - store.applyRecoveryTransaction(transaction); + store.applySnapshot(snapshot); } catch (Exception e) { - log.error("{}: An exception occurred when applying snapshot", logId, e); + log.error("{}: Failed to apply snapshot {}", logId, snapshot, e); + return; } - } - - @Override - public void applySnapshot(final byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - log.info("{}: Applying snapshot", logId); - deserializeAndApplySnapshot(snapshotBytes); log.info("{}: Done applying snapshot", logId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index bb016a28bd..5fac0abe6b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -20,7 +20,6 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; @@ -61,15 +60,12 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Ide } @Override - public DataTreeModification getDataTreeModification() { - DataTreeModification dataTreeModification = transaction; - if (transaction instanceof PruningDataTreeModification){ - dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification(); - } - return dataTreeModification; + + DataTreeModification getDataTreeModification() { + return transaction; } - private void checkState(State expected) { + private void checkState(final State expected) { Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index dd27b4e629..c348727cf8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -19,7 +19,6 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; -import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -31,7 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; * @author Robert Varga */ @Beta -public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable { +public final class CommitTransactionPayload extends Payload implements Serializable { private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; private byte[] serialized; @@ -78,10 +77,9 @@ public final class CommitTransactionPayload extends Payload implements DataTreeC return new CommitTransactionPayload(out.toByteArray()); } - @Override - public Entry, DataTreeCandidate> getCandidate() throws IOException { + public Entry getCandidate() throws IOException { final DataInput in = ByteStreams.newDataInput(serialized); - return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)), + return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), DataTreeCandidateInputOutput.readDataTreeCandidate(in)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java deleted file mode 100644 index 4cd1c4a975..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.persisted; - -import com.google.common.annotations.Beta; -import java.io.IOException; -import java.util.Map.Entry; -import java.util.Optional; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; - -/** - * Interim interface for consolidating DataTreeCandidatePayload and {@link CommitTransactionPayload}. - * - * @author Robert Varga - */ -@Beta -public interface DataTreeCandidateSupplier { - Entry, DataTreeCandidate> getCandidate() throws IOException; -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java index 682c0b73ec..8cde0d9532 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Map; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An {@link AbstractVersionedShardDataTreeSnapshot} which contains additional metadata. @@ -30,8 +32,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot implements Serializable { private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MetadataShardDataTreeSnapshot.class); - private Map, ShardDataTreeSnapshotMetadata> metadata; + private Map>, ShardDataTreeSnapshotMetadata> metadata; private NormalizedNode rootNode; public Proxy() { @@ -46,7 +49,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeInt(metadata.size()); - for (ShardDataTreeSnapshotMetadata m : metadata.values()) { + for (ShardDataTreeSnapshotMetadata m : metadata.values()) { out.writeObject(m); } @@ -59,11 +62,15 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize); // Default pre-allocate is 4, which should be fine - final Builder, ShardDataTreeSnapshotMetadata> metaBuilder = + final Builder>, ShardDataTreeSnapshotMetadata> metaBuilder = ImmutableMap.builder(); for (int i = 0; i < metaSize; ++i) { - final ShardDataTreeSnapshotMetadata m = (ShardDataTreeSnapshotMetadata) in.readObject(); - metaBuilder.put(m.getClass(), m); + final ShardDataTreeSnapshotMetadata m = (ShardDataTreeSnapshotMetadata) in.readObject(); + if (m != null) { + metaBuilder.put(m.getType(), m); + } else { + LOG.warn("Skipping null metadata"); + } } metadata = metaBuilder.build(); @@ -77,7 +84,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD private static final long serialVersionUID = 1L; - private final Map, ShardDataTreeSnapshotMetadata> metadata; + private final Map>, ShardDataTreeSnapshotMetadata> metadata; private final NormalizedNode rootNode; public MetadataShardDataTreeSnapshot(final NormalizedNode rootNode) { @@ -85,12 +92,12 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD } public MetadataShardDataTreeSnapshot(final NormalizedNode rootNode, - final Map, ShardDataTreeSnapshotMetadata> metadata) { + final Map>, ShardDataTreeSnapshotMetadata> metadata) { this.rootNode = Preconditions.checkNotNull(rootNode); this.metadata = ImmutableMap.copyOf(metadata); } - public Map, ShardDataTreeSnapshotMetadata> getMetadata() { + public Map>, ShardDataTreeSnapshotMetadata> getMetadata() { return metadata; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java index a20ec4eba4..7941c9fa59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java @@ -26,7 +26,7 @@ import javax.annotation.Nonnull; * * @author Robert Varga */ -public abstract class ShardDataTreeSnapshotMetadata implements Serializable { +public abstract class ShardDataTreeSnapshotMetadata> implements Serializable { private static final long serialVersionUID = 1L; ShardDataTreeSnapshotMetadata() { @@ -43,4 +43,6 @@ public abstract class ShardDataTreeSnapshotMetadata implements Serializable { * @return Externalizable proxy, may not be null */ protected abstract @Nonnull Externalizable externalizableProxy(); + + public abstract Class getType(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModification.java index 69e2c88d26..697b0c516a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModification.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore.utils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ForwardingObject; import java.io.IOException; import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; @@ -29,7 +31,7 @@ import org.slf4j.LoggerFactory; * The PruningDataTreeModification first removes all entries from the data which do not belong in the schemaContext * before delegating it to the actual DataTreeModification */ -public class PruningDataTreeModification implements DataTreeModification { +public class PruningDataTreeModification extends ForwardingObject implements DataTreeModification { private static final Logger LOG = LoggerFactory.getLogger(PruningDataTreeModification.class); private DataTreeModification delegate; @@ -37,9 +39,14 @@ public class PruningDataTreeModification implements DataTreeModification { private final DataTree dataTree; public PruningDataTreeModification(DataTreeModification delegate, DataTree dataTree, SchemaContext schemaContext) { - this.delegate = delegate; - this.dataTree = dataTree; - this.schemaContext = schemaContext; + this.delegate = Preconditions.checkNotNull(delegate); + this.dataTree = Preconditions.checkNotNull(dataTree); + this.schemaContext = Preconditions.checkNotNull(schemaContext); + } + + @Override + public DataTreeModification delegate() { + return delegate; } @Override @@ -140,10 +147,6 @@ public class PruningDataTreeModification implements DataTreeModification { return pruner.normalizedNode(); } - public DataTreeModification getResultingModification(){ - return delegate; - } - private static class PruningDataTreeModificationCursor extends AbstractDataTreeModificationCursor { private final DataTreeModification toModification; private final PruningDataTreeModification pruningModification; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java index cce9bddde0..3a7b89f91e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java @@ -119,13 +119,13 @@ public class DataTreeCandidatePayloadTest { @Test public void testCandidateSerDes() throws IOException { final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate().getValue()); + assertCandidateEquals(candidate, payload.getCandidate()); } @Test public void testPayloadSerDes() throws IOException { final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue()); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -140,7 +140,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate().getValue()); + assertCandidateEquals(candidate, payload.getCandidate()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -156,7 +156,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate().getValue()); + assertCandidateEquals(candidate, payload.getCandidate()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -172,7 +172,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate().getValue()); + assertCandidateEquals(candidate, payload.getCandidate()); } @Test @@ -183,6 +183,6 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate().getValue()); + assertCandidateEquals(candidate, payload.getCandidate()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index d4dcc9cda2..243b2cb7e2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -393,7 +393,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testPeerAddressResolved() throws Exception { new ShardTestKit(getSystem()) {{ - ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config"); + final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config"); final TestActorRef shard = actorFactory.createTestActor(newShardBuilder(). peerAddresses(Collections.singletonMap(peerID.toString(), null)).props(). withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved"); @@ -402,7 +402,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender()); shard.tell(GetOnDemandRaftState.INSTANCE, getRef()); - OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class); + final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class); assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString())); }}; } @@ -433,14 +433,14 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender()); - Stopwatch sw = Stopwatch.createStarted(); + final Stopwatch sw = Stopwatch.createStarted(); while(sw.elapsed(TimeUnit.SECONDS) <= 5) { Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS); try { assertEquals("Root node", expected, readStore(shard, root)); return; - } catch(AssertionError e) { + } catch(final AssertionError e) { // try again } } @@ -455,19 +455,28 @@ public class ShardTest extends AbstractShardTest { ShardTestKit.waitUntilLeader(shard); - final DataTree source = setupInMemorySnapshotStore(); - final DataTreeModification writeMod = source.takeSnapshot().newModification(); - ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + store.setSchemaContext(SCHEMA_CONTEXT); + writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + final NormalizedNode root = readStore(store, YangInstanceIdentifier.EMPTY); + final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), + Collections. emptyList(), 1, 2, 3, 4); + + shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender()); + + final DataTreeModification writeMod = store.takeSnapshot().newModification(); + final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); writeMod.write(TestModel.TEST_PATH, node); writeMod.ready(); final TransactionIdentifier tx = nextTransactionId(); final ApplyState applyState = new ApplyState(null, tx, - new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx))); + new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx))); shard.tell(applyState, shard); - Stopwatch sw = Stopwatch.createStarted(); + final Stopwatch sw = Stopwatch.createStarted(); while(sw.elapsed(TimeUnit.SECONDS) <= 5) { Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS); @@ -532,7 +541,7 @@ public class ShardTest extends AbstractShardTest { final TransactionIdentifier transactionID2 = nextTransactionId(); final TransactionIdentifier transactionID3 = nextTransactionId(); - Map cohortMap = setupCohortDecorator( + final Map cohortMap = setupCohortDecorator( shard.underlyingActor(), transactionID1, transactionID2, transactionID3); final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1); final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2); @@ -797,9 +806,9 @@ public class ShardTest extends AbstractShardTest { // Test merge with invalid data. An exception should occur when the merge is applied. Note that // write will not validate the children for performance reasons. - TransactionIdentifier transactionID = nextTransactionId(); + final TransactionIdentifier transactionID = nextTransactionId(); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); @@ -808,7 +817,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(batched, getRef()); Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); - Throwable cause = failure.cause(); + final Throwable cause = failure.cause(); batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); batched.setReady(true); @@ -1108,7 +1117,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); if(readWrite) { - ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore(). + final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore(). newReadWriteTransaction(transactionID); shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef()); } else { @@ -1764,7 +1773,7 @@ public class ShardTest extends AbstractShardTest { // Now send CanCommitTransaction - should fail. shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); - Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); + final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); assertTrue("Failure type", failure instanceof IllegalStateException); // Ready and CanCommit another and verify success. @@ -2056,7 +2065,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - String testName = "testClusteredDataChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataChangeListenerDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000). customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2089,7 +2098,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testClusteredDataChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - String testName = "testClusteredDataChangeListenerRegistration"; + final String testName = "testClusteredDataChangeListenerRegistration"; final ShardIdentifier followerShardID = ShardIdentifier.create("inventory", MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config"); @@ -2110,7 +2119,7 @@ public class ShardTest extends AbstractShardTest { withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - String leaderPath = waitUntilLeader(followerShard); + final String leaderPath = waitUntilLeader(followerShard); assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); final YangInstanceIdentifier path = TestModel.TEST_PATH; @@ -2132,7 +2141,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; dataStoreContextBuilder.shardElectionTimeoutFactor(1000). customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); @@ -2163,7 +2172,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testClusteredDataTreeChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - String testName = "testClusteredDataTreeChangeListenerRegistration"; + final String testName = "testClusteredDataTreeChangeListenerRegistration"; final ShardIdentifier followerShardID = ShardIdentifier.create("inventory", MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config"); @@ -2184,7 +2193,7 @@ public class ShardTest extends AbstractShardTest { withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - String leaderPath = waitUntilLeader(followerShard); + final String leaderPath = waitUntilLeader(followerShard); assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); final YangInstanceIdentifier path = TestModel.TEST_PATH; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java index f25153f335..2edd619793 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java @@ -53,7 +53,7 @@ public class ShardDataTreeSnapshotTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - Map, ShardDataTreeSnapshotMetadata> expMetadata = + Map>, ShardDataTreeSnapshotMetadata> expMetadata = ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test")); MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata); byte[] serialized = snapshot.serialize(); @@ -84,7 +84,7 @@ public class ShardDataTreeSnapshotTest { assertEquals("Deserialized type", PreBoronShardDataTreeSnapshot.class, deserialized.getClass()); } - static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata { + static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata { private static final long serialVersionUID = 1L; private final String data; @@ -93,6 +93,11 @@ public class ShardDataTreeSnapshotTest { this.data = data; } + @Override + public Class getType() { + return TestShardDataTreeSnapshotMetadata.class; + } + @Override protected Externalizable externalizableProxy() { return new Proxy(data); @@ -108,7 +113,6 @@ public class ShardDataTreeSnapshotTest { return data.equals(((TestShardDataTreeSnapshotMetadata)obj).data); } - private static class Proxy implements Externalizable { private String data; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java index c078c94637..4671a8ef15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PruningDataTreeModificationTest.java @@ -293,7 +293,7 @@ public class PruningDataTreeModificationTest { private DataTreeCandidateTip getCandidate() throws DataValidationFailedException { pruningDataTreeModification.ready(); - DataTreeModification mod = pruningDataTreeModification.getResultingModification(); + DataTreeModification mod = pruningDataTreeModification.delegate(); mod = mod == proxyModification ? realModification : mod; dataTree.validate(mod); DataTreeCandidateTip candidate = dataTree.prepare(mod); -- 2.36.6