*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
- private final DataTree store;
+ private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private DataTreeModification transaction;
+ private final Set<URI> validNamespaces;
+ private PruningDataTreeModification transaction;
private int size;
+ private final byte[] restoreFromSnapshot;
- ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
- this.store = store.getDataTree();
+ ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+ String shardName, Logger log) {
+ this.store = Preconditions.checkNotNull(store);
+ this.restoreFromSnapshot = restoreFromSnapshot;
this.shardName = shardName;
this.log = log;
+ this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
}
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
- transaction = store.takeSnapshot().newModification();
+ transaction = new PruningDataTreeModification(store.newModification(), validNamespaces);
size = 0;
}
@Override
public void appendRecoveredLogEntry(Payload payload) {
+ Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+
try {
if (payload instanceof DataTreeCandidatePayload) {
DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
}
}
- private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException {
- tx.ready();
- store.validate(tx);
- store.commit(store.prepare(tx));
+ private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
+ store.commit(tx.getDelegate());
}
/**
*/
@Override
public void applyCurrentLogRecoveryBatch() {
+ Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+
log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
try {
commitTransaction(transaction);
log.debug("{}: Applying recovered snapshot", shardName);
final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- final DataTreeModification tx = store.takeSnapshot().newModification();
+ final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(), validNamespaces);
tx.write(ROOT, node);
try {
commitTransaction(tx);
log.error("{}: Failed to apply recovery snapshot", shardName, e);
}
}
+
+ @Override
+ public byte[] getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
}