import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
* @deprecated Deprecated in Boron in favor of CommitTransactionPayload
*/
@Deprecated
-final class DataTreeCandidatePayload extends Payload implements DataTreeCandidateSupplier, Externalizable {
+final class DataTreeCandidatePayload extends Payload implements Externalizable {
private static final long serialVersionUID = 1L;
private transient byte[] serialized;
return new DataTreeCandidatePayload(out.toByteArray());
}
-
- @Override
- public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
- return new SimpleImmutableEntry<>(Optional.empty(),
- DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized)));
+ public DataTreeCandidate getCandidate() throws IOException {
+ return DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized));
}
@Override
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof DataTreeCandidateSupplier) {
- if (clientActor == null) {
- // No clientActor indicates a replica coming from the leader
- try {
- store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data);
- } catch (DataValidationFailedException | IOException e) {
- LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
- }
- } else {
- // Replication consensus reached, proceed to commit
- store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data);
+ if (data instanceof Payload) {
+ try {
+ store.applyReplicatedPayload(identifier, (Payload)data);
+ } catch (DataValidationFailedException | IOException e) {
+ LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}
} else {
- LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
- data.getClass().getClassLoader());
+ LOG.error("{}: Unknown state for {} received {}", persistenceId(), identifier, data);
}
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.primitives.UnsignedLong;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
+ private final Collection<ShardDataTreeMetadata<?>> metadata;
private final TipProducingDataTree dataTree;
private final String logContext;
private final Shard shard;
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
- final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
- this.dataTree = dataTree;
+ final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+ final ShardDataTreeMetadata<?>... metadata) {
+ this.dataTree = Preconditions.checkNotNull(dataTree);
updateSchemaContext(schemaContext);
this.shard = Preconditions.checkNotNull(shard);
this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
+ this.metadata = ImmutableList.copyOf(metadata);
}
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
this.schemaContext = Preconditions.checkNotNull(schemaContext);
}
- ShardDataTreeSnapshot takeRecoverySnapshot() {
- return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+ /**
+ * Take a snapshot of current state for later recovery.
+ *
+ * @return A state snapshot
+ */
+ @Nonnull ShardDataTreeSnapshot takeStateSnapshot() {
+ final NormalizedNode<?, ?> rootNode = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get();
+ final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
+ ImmutableMap.builder();
+
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ final ShardDataTreeSnapshotMetadata<?> meta = m.toStapshot();
+ if (meta != null) {
+ metaBuilder.put(meta.getType(), meta);
+ }
+ }
+
+ return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
- // FIXME: purge any outstanding transactions
+ private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+ final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
+ final Stopwatch elapsed = Stopwatch.createStarted();
- final DataTreeModification snapshot = transaction.getSnapshot();
- snapshot.ready();
+ if (!pendingTransactions.isEmpty()) {
+ LOG.warn("{}: applying state snapshot with pending transactions", logContext);
+ }
+
+ final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> snapshotMeta;
+ if (snapshot instanceof MetadataShardDataTreeSnapshot) {
+ snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+ } else {
+ snapshotMeta = ImmutableMap.of();
+ }
+
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ final ShardDataTreeSnapshotMetadata<?> s = snapshotMeta.get(m.getSupportedType());
+ if (s != null) {
+ m.applySnapshot(s);
+ } else {
+ m.reset();
+ }
+ }
+
+ final DataTreeModification mod = wrapper.apply(dataTree.takeSnapshot().newModification());
+ // delete everything first
+ mod.delete(YangInstanceIdentifier.EMPTY);
+
+ final java.util.Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+ if (maybeNode.isPresent()) {
+ // Add everything from the remote node back
+ mod.write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+ }
+ mod.ready();
+
+ final DataTreeModification unwrapped = unwrap(mod);
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
+ }
+
+ private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
+ return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+ }
+
+ private static DataTreeModification unwrap(final DataTreeModification modification) {
+ if (modification instanceof PruningDataTreeModification) {
+ return ((PruningDataTreeModification)modification).delegate();
+ }
+ return modification;
+ }
+
+ /**
+ * Apply a snapshot coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, this::wrapWithPruning);
+ }
- dataTree.validate(snapshot);
- dataTree.commit(dataTree.prepare(snapshot));
+
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
+ private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+ final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
+ DataTreeCandidates.applyToModification(mod, candidate);
+ mod.ready();
+
+ final DataTreeModification unwrapped = mod.delegate();
+ LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
+
+ dataTree.validate(unwrapped);
+ dataTree.commit(dataTree.prepare(unwrapped));
+ }
+
+ /**
+ * Apply a payload coming from recovery. This method does not assume the SchemaContexts match and performs data
+ * pruning in an attempt to adjust the state to our current SchemaContext.
+ *
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+ if (payload instanceof CommitTransactionPayload) {
+ final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ applyRecoveryCandidate(e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else if (payload instanceof DataTreeCandidatePayload) {
+ applyRecoveryCandidate(((DataTreeCandidatePayload) payload).getCandidate());
+ } else {
+ LOG.warn("{}: ignoring unhandled payload {}", logContext, payload);
+ }
+ }
+
+ private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ throws DataValidationFailedException {
+ LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+
+ final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ DataTreeCandidates.applyToModification(mod, foreign);
+ mod.ready();
+
+ LOG.trace("{}: Applying foreign modification {}", logContext, mod);
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
+
+ notifyListeners(candidate);
+ }
+
+ /**
+ * Apply a payload coming from the leader, which could actually be us. This method assumes the leader and follower
+ * SchemaContexts match and does not perform any pruning.
+ *
+ * @param identifier Payload identifier as returned from RaftActor
+ * @param payload Payload
+ * @throws IOException when the snapshot fails to deserialize
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ DataValidationFailedException {
+ /*
+ * This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
+ * if we are the leader and it has originated with us.
+ *
+ * The identifier will only ever be non-null when we were the leader which achieved consensus. Unfortunately,
+ * though, this may not be the case anymore, as we are being called some time afterwards and we may not be
+ * acting in that capacity anymore.
+ *
+ * In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
+ * pre-Boron state -- which limits the number of options here.
+ */
+ if (payload instanceof CommitTransactionPayload) {
+ if (identifier == null) {
+ final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ applyReplicatedCandidate(e.getKey(), e.getValue());
+ allMetadataCommittedTransaction(e.getKey());
+ } else {
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
+ }
+ } else {
+ LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
+ }
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ final CommitEntry current = pendingTransactions.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ private void allMetadataCommittedTransaction(final TransactionIdentifier txId) {
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.transactionCommitted(txId);
+ }
}
private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
return pendingTransactions.size();
}
- void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
- LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
-
- final DataTreeModification mod = dataTree.takeSnapshot().newModification();
- DataTreeCandidates.applyToModification(mod, foreign);
- mod.ready();
-
- LOG.trace("{}: Applying foreign modification {}", logContext, mod);
- dataTree.validate(mod);
- final DataTreeCandidate candidate = dataTree.prepare(mod);
- dataTree.commit(candidate);
- notifyListeners(candidate);
- }
-
@Override
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
// Intentional no-op
return dataTree.takeSnapshot().newModification();
}
+ /**
+ * @deprecated This method violates DataTree containment and will be removed.
+ */
@VisibleForTesting
- // FIXME: This should be removed, it violates encapsulation
+ @Deprecated
public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
modification.ready();
dataTree.validate(modification);
- DataTreeCandidateTip candidate = dataTree.prepare(modification);
+ DataTreeCandidate candidate = dataTree.prepare(modification);
dataTree.commit(candidate);
return candidate;
}
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
try {
- try {
- dataTree.commit(candidate);
- } catch (IllegalStateException e) {
- // We may get a "store tree and candidate base differ" IllegalStateException from commit under
- // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
- // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
- // applying it to the state. We then become the leader and a second tx is pre-committed and
- // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
- // candidate via applyState prior to the second tx. Since the second tx has already been
- // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
- // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
- // solution will be forthcoming.
-
- LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
- applyForeignCandidate(txId, candidate);
- }
+ dataTree.commit(candidate);
} catch (Exception e) {
+ LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
failCommit(e);
return;
}
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
-
pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
}
- private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
- final CommitEntry current = pendingTransactions.peek();
- if (current == null) {
- LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
- return;
- }
-
- if (!current.cohort.getIdentifier().equals(txId)) {
- LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
- current.cohort.getIdentifier(), txId);
- return;
- }
-
- finishCommit(current.cohort);
- }
-
- void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
- // For now we do not care about anything else but transactions
- Verify.verify(identifier instanceof TransactionIdentifier);
- payloadReplicationComplete((TransactionIdentifier)identifier, payload);
- }
-
void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
return cohort;
}
- void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
- throws DataValidationFailedException, IOException {
- applyForeignCandidate(identifier, payload.getCandidate().getValue());
- }
-
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Verify;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+
+abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
+ final void applySnapshot(@Nonnull final ShardDataTreeSnapshotMetadata<?> snapshot) {
+ Verify.verify(getSupportedType().isInstance(snapshot), "Snapshot %s misrouted to handler of %s", snapshot,
+ getSupportedType());
+ doApplySnapshot(getSupportedType().cast(snapshot));
+ }
+
+ abstract void reset();
+
+ abstract void doApplySnapshot(@Nonnull T snapshot);
+
+ abstract @Nonnull Class<T> getSupportedType();
+
+ abstract @Nullable T toStapshot();
+
+ // Lifecycle events
+ abstract void transactionCommitted(TransactionIdentifier txId);
+}
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.File;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
-import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.slf4j.Logger;
/**
private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private PruningDataTreeModification transaction;
- private int size;
private final byte[] restoreFromSnapshot;
- ShardRecoveryCoordinator(ShardDataTree store, byte[] restoreFromSnapshot, String shardName, Logger log) {
+ private boolean open;
+
+ ShardRecoveryCoordinator(final ShardDataTree store, final byte[] restoreFromSnapshot, final String shardName, final Logger log) {
this.store = Preconditions.checkNotNull(store);
- this.restoreFromSnapshot = restoreFromSnapshot;
this.shardName = Preconditions.checkNotNull(shardName);
this.log = Preconditions.checkNotNull(log);
+
+ this.restoreFromSnapshot = restoreFromSnapshot;
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
- transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(),
- store.getSchemaContext());
- size = 0;
+ open = true;
}
@Override
- public void appendRecoveredLogEntry(Payload payload) {
- Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+ public void appendRecoveredLogEntry(final Payload payload) {
+ Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
try {
- if (payload instanceof DataTreeCandidateSupplier) {
- final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
- ((DataTreeCandidateSupplier)payload).getCandidate();
-
- DataTreeCandidates.applyToModification(transaction, e.getValue());
- size++;
-
- if (e.getKey().isPresent()) {
- // FIXME: BUG-5280: propagate transaction state
- }
- } else {
- log.error("{}: Unknown payload {} received during recovery", shardName, payload);
- }
- } catch (IOException e) {
- log.error("{}: Error extracting payload", shardName, e);
+ store.applyRecoveryPayload(payload);
+ } catch (Exception e) {
+ log.error("{}: failed to apply payload {}", shardName, payload, e);
+ throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+ shardName, payload), e);
}
}
- private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
- store.commit(tx.getResultingModification());
- }
-
/**
* Applies the current batched log entries to the data store.
*/
@Override
public void applyCurrentLogRecoveryBatch() {
- Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+ Preconditions.checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+ open = false;
+ }
- log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
- try {
- commitTransaction(transaction);
- } catch (Exception e) {
- File file = new File(System.getProperty("karaf.data", "."),
- "failed-recovery-batch-" + shardName + ".out");
- DataTreeModificationOutput.toFile(file, transaction.getResultingModification());
- throw new RuntimeException(String.format(
- "%s: Failed to apply recovery batch. Modification data was written to file %s",
- shardName, file), e);
- }
- transaction = null;
+ private File writeRoot(final String kind, final NormalizedNode<?, ?> node) {
+ final File file = new File(System.getProperty("karaf.data", "."),
+ "failed-" + kind + "-snapshot-" + shardName + ".xml");
+ NormalizedNodeXMLOutput.toFile(file, node);
+ return file;
}
/**
final ShardDataTreeSnapshot snapshot;
try {
snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
- } catch (IOException e) {
- log.error("{}: failed to deserialize snapshot", e);
+ } catch (Exception e) {
+ log.error("{}: failed to deserialize snapshot", shardName, e);
throw Throwables.propagate(e);
}
- final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
- store.getDataTree(), store.getSchemaContext());
-
- final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
- tx.write(YangInstanceIdentifier.EMPTY, node);
-
try {
- commitTransaction(tx);
+ store.applyRecoverySnapshot(snapshot);
} catch (Exception e) {
- File file = new File(System.getProperty("karaf.data", "."),
- "failed-recovery-snapshot-" + shardName + ".xml");
- NormalizedNodeXMLOutput.toFile(file, node);
- throw new RuntimeException(String.format(
- "%s: Failed to apply recovery snapshot. Node data was written to file %s",
- shardName, file), e);
+ log.error("{}: failed to apply snapshot {}", shardName, snapshot, e);
+
+ final File f = writeRoot("recovery", snapshot.getRootNode().orElse(null));
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery snapshot. Node data was written to file %s", shardName, f), e);
}
}
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
/**
class ShardSnapshotCohort implements RaftActorSnapshotCohort {
private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
- private final LocalHistoryIdentifier applyHistoryId;
private final ActorRef snapshotActor;
private final ShardDataTree store;
private final String logId;
private final Logger log;
- private long applyCounter;
-
private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
final ShardDataTree store, final Logger log, final String logId) {
- this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId);
this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
this.store = Preconditions.checkNotNull(store);
this.log = log;
@Override
public void createSnapshot(final ActorRef actorRef) {
// Forward the request to the snapshot actor
- ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef);
+ ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef);
}
- private void deserializeAndApplySnapshot(final byte[] snapshotBytes) {
+ @Override
+ public void applySnapshot(final byte[] snapshotBytes) {
+ // Since this will be done only on Recovery or when this actor is a Follower
+ // we can safely commit everything in here. We not need to worry about event notifications
+ // as they would have already been disabled on the follower
+
+ log.info("{}: Applying snapshot", logId);
+
final ShardDataTreeSnapshot snapshot;
try {
snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
}
try {
- final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
- new TransactionIdentifier(applyHistoryId, applyCounter++));
-
- // delete everything first
- transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY);
-
- final Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
- if (maybeNode.isPresent()) {
- // Add everything from the remote node back
- transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get());
- }
-
- store.applyRecoveryTransaction(transaction);
+ store.applySnapshot(snapshot);
} catch (Exception e) {
- log.error("{}: An exception occurred when applying snapshot", logId, e);
+ log.error("{}: Failed to apply snapshot {}", logId, snapshot, e);
+ return;
}
- }
-
- @Override
- public void applySnapshot(final byte[] snapshotBytes) {
- // Since this will be done only on Recovery or when this actor is a Follower
- // we can safely commit everything in here. We not need to worry about event notifications
- // as they would have already been disabled on the follower
-
- log.info("{}: Applying snapshot", logId);
- deserializeAndApplySnapshot(snapshotBytes);
log.info("{}: Done applying snapshot", logId);
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
}
@Override
- public DataTreeModification getDataTreeModification() {
- DataTreeModification dataTreeModification = transaction;
- if (transaction instanceof PruningDataTreeModification){
- dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
- }
- return dataTreeModification;
+
+ DataTreeModification getDataTreeModification() {
+ return transaction;
}
- private void checkState(State expected) {
+ private void checkState(final State expected) {
Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
}
import java.io.Serializable;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
-import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
* @author Robert Varga
*/
@Beta
-public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable {
+public final class CommitTransactionPayload extends Payload implements Serializable {
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private byte[] serialized;
return new CommitTransactionPayload(out.toByteArray());
}
- @Override
- public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
+ public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
final DataInput in = ByteStreams.newDataInput(serialized);
- return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)),
+ return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
DataTreeCandidateInputOutput.readDataTreeCandidate(in));
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.persisted;
-
-import com.google.common.annotations.Beta;
-import java.io.IOException;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-/**
- * Interim interface for consolidating DataTreeCandidatePayload and {@link CommitTransactionPayload}.
- *
- * @author Robert Varga
- */
-@Beta
-public interface DataTreeCandidateSupplier {
- Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException;
-}
import java.util.Map;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link AbstractVersionedShardDataTreeSnapshot} which contains additional metadata.
public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot implements Serializable {
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataShardDataTreeSnapshot.class);
- private Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+ private Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata;
private NormalizedNode<?, ?> rootNode;
public Proxy() {
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
out.writeInt(metadata.size());
- for (ShardDataTreeSnapshotMetadata m : metadata.values()) {
+ for (ShardDataTreeSnapshotMetadata<?> m : metadata.values()) {
out.writeObject(m);
}
Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
// Default pre-allocate is 4, which should be fine
- final Builder<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metaBuilder =
+ final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
ImmutableMap.builder();
for (int i = 0; i < metaSize; ++i) {
- final ShardDataTreeSnapshotMetadata m = (ShardDataTreeSnapshotMetadata) in.readObject();
- metaBuilder.put(m.getClass(), m);
+ final ShardDataTreeSnapshotMetadata<?> m = (ShardDataTreeSnapshotMetadata<?>) in.readObject();
+ if (m != null) {
+ metaBuilder.put(m.getType(), m);
+ } else {
+ LOG.warn("Skipping null metadata");
+ }
}
metadata = metaBuilder.build();
private static final long serialVersionUID = 1L;
- private final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+ private final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata;
private final NormalizedNode<?, ?> rootNode;
public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode) {
}
public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
- final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata) {
+ final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
this.rootNode = Preconditions.checkNotNull(rootNode);
this.metadata = ImmutableMap.copyOf(metadata);
}
- public Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> getMetadata() {
+ public Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> getMetadata() {
return metadata;
}
*
* @author Robert Varga
*/
-public abstract class ShardDataTreeSnapshotMetadata implements Serializable {
+public abstract class ShardDataTreeSnapshotMetadata<T extends ShardDataTreeSnapshotMetadata<T>> implements Serializable {
private static final long serialVersionUID = 1L;
ShardDataTreeSnapshotMetadata() {
* @return Externalizable proxy, may not be null
*/
protected abstract @Nonnull Externalizable externalizableProxy();
+
+ public abstract Class<T> getType();
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ForwardingObject;
import java.io.IOException;
import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
* The PruningDataTreeModification first removes all entries from the data which do not belong in the schemaContext
* before delegating it to the actual DataTreeModification
*/
-public class PruningDataTreeModification implements DataTreeModification {
+public class PruningDataTreeModification extends ForwardingObject implements DataTreeModification {
private static final Logger LOG = LoggerFactory.getLogger(PruningDataTreeModification.class);
private DataTreeModification delegate;
private final DataTree dataTree;
public PruningDataTreeModification(DataTreeModification delegate, DataTree dataTree, SchemaContext schemaContext) {
- this.delegate = delegate;
- this.dataTree = dataTree;
- this.schemaContext = schemaContext;
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ }
+
+ @Override
+ public DataTreeModification delegate() {
+ return delegate;
}
@Override
return pruner.normalizedNode();
}
- public DataTreeModification getResultingModification(){
- return delegate;
- }
-
private static class PruningDataTreeModificationCursor extends AbstractDataTreeModificationCursor {
private final DataTreeModification toModification;
private final PruningDataTreeModification pruningModification;
@Test
public void testCandidateSerDes() throws IOException {
final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
public void testPayloadSerDes() throws IOException {
final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue());
+ assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode);
DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
@Test
DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode);
DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
- assertCandidateEquals(candidate, payload.getCandidate().getValue());
+ assertCandidateEquals(candidate, payload.getCandidate());
}
}
@Test
public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
- ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
+ final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
- OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
+ final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
}};
}
shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
- Stopwatch sw = Stopwatch.createStarted();
+ final Stopwatch sw = Stopwatch.createStarted();
while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
try {
assertEquals("Root node", expected, readStore(shard, root));
return;
- } catch(AssertionError e) {
+ } catch(final AssertionError e) {
// try again
}
}
ShardTestKit.waitUntilLeader(shard);
- final DataTree source = setupInMemorySnapshotStore();
- final DataTreeModification writeMod = source.takeSnapshot().newModification();
- ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+ store.setSchemaContext(SCHEMA_CONTEXT);
+ writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
+ final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
+ Collections.<ReplicatedLogEntry> emptyList(), 1, 2, 3, 4);
+
+ shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
+
+ final DataTreeModification writeMod = store.takeSnapshot().newModification();
+ final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
writeMod.write(TestModel.TEST_PATH, node);
writeMod.ready();
final TransactionIdentifier tx = nextTransactionId();
final ApplyState applyState = new ApplyState(null, tx,
- new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
+ new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx)));
shard.tell(applyState, shard);
- Stopwatch sw = Stopwatch.createStarted();
+ final Stopwatch sw = Stopwatch.createStarted();
while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
final TransactionIdentifier transactionID2 = nextTransactionId();
final TransactionIdentifier transactionID3 = nextTransactionId();
- Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+ final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
// Test merge with invalid data. An exception should occur when the merge is applied. Note that
// write will not validate the children for performance reasons.
- TransactionIdentifier transactionID = nextTransactionId();
+ final TransactionIdentifier transactionID = nextTransactionId();
- ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
shard.tell(batched, getRef());
Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
- Throwable cause = failure.cause();
+ final Throwable cause = failure.cause();
batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
final FiniteDuration duration = duration("5 seconds");
if(readWrite) {
- ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
newReadWriteTransaction(transactionID);
shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
} else {
// Now send CanCommitTransaction - should fail.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
assertTrue("Failure type", failure instanceof IllegalStateException);
// Ready and CanCommit another and verify success.
@Test
public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataChangeListenerDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
@Test
public void testClusteredDataChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- String testName = "testClusteredDataChangeListenerRegistration";
+ final String testName = "testClusteredDataChangeListenerRegistration";
final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- String leaderPath = waitUntilLeader(followerShard);
+ final String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
@Test
public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
@Test
public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final String testName = "testClusteredDataTreeChangeListenerRegistration";
final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- String leaderPath = waitUntilLeader(followerShard);
+ final String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> expMetadata =
+ Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> expMetadata =
ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
byte[] serialized = snapshot.serialize();
assertEquals("Deserialized type", PreBoronShardDataTreeSnapshot.class, deserialized.getClass());
}
- static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata {
+ static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata<TestShardDataTreeSnapshotMetadata> {
private static final long serialVersionUID = 1L;
private final String data;
this.data = data;
}
+ @Override
+ public Class<TestShardDataTreeSnapshotMetadata> getType() {
+ return TestShardDataTreeSnapshotMetadata.class;
+ }
+
@Override
protected Externalizable externalizableProxy() {
return new Proxy(data);
return data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
}
-
private static class Proxy implements Externalizable {
private String data;
private DataTreeCandidateTip getCandidate() throws DataValidationFailedException {
pruningDataTreeModification.ready();
- DataTreeModification mod = pruningDataTreeModification.getResultingModification();
+ DataTreeModification mod = pruningDataTreeModification.delegate();
mod = mod == proxyModification ? realModification : mod;
dataTree.validate(mod);
DataTreeCandidateTip candidate = dataTree.prepare(mod);