package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
+import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
* @author Thomas Pantelis
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
- private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
- private final DataTree store;
+ private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private final Set<URI> validNamespaces;
private PruningDataTreeModification transaction;
private int size;
+ private final byte[] restoreFromSnapshot;
- ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
- this.store = store.getDataTree();
- this.shardName = shardName;
- this.log = log;
- this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
+ ShardRecoveryCoordinator(ShardDataTree store, byte[] restoreFromSnapshot, String shardName, Logger log) {
+ this.store = Preconditions.checkNotNull(store);
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.log = Preconditions.checkNotNull(log);
}
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
- transaction = new PruningDataTreeModification(store.takeSnapshot().newModification(), validNamespaces);
+ transaction = new PruningDataTreeModification(store.newModification(), store.getDataTree(),
+ store.getSchemaContext());
size = 0;
}
Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
try {
- if (payload instanceof DataTreeCandidatePayload) {
- DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
- size++;
- } else if (payload instanceof ModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((ModificationPayload) payload).getModification()).apply(transaction);
- size++;
- } else if (payload instanceof CompositeModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((CompositeModificationPayload) payload).getModification()).apply(transaction);
- size++;
- } else if (payload instanceof CompositeModificationByteStringPayload) {
- MutableCompositeModification.fromSerializable(
- ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+ if (payload instanceof DataTreeCandidateSupplier) {
+ final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
+ ((DataTreeCandidateSupplier)payload).getCandidate();
+
+ DataTreeCandidates.applyToModification(transaction, e.getValue());
size++;
+
+ if (e.getKey().isPresent()) {
+ // FIXME: BUG-5280: propagate transaction state
+ }
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (IOException | ClassNotFoundException e) {
- log.error("{}: Error extracting ModificationPayload", shardName, e);
+ } catch (IOException e) {
+ log.error("{}: Error extracting payload", shardName, e);
}
}
private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
- DataTreeModification delegate = tx.getDelegate();
- delegate.ready();
- store.validate(delegate);
- store.commit(store.prepare(delegate));
+ store.commit(tx.getResultingModification());
}
/**
log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
try {
commitTransaction(transaction);
- } catch (DataValidationFailedException e) {
- log.error("{}: Failed to apply recovery batch", shardName, e);
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-batch-" + shardName + ".out");
+ DataTreeModificationOutput.toFile(file, transaction.getResultingModification());
+ throw new RuntimeException(String.format(
+ "%s: Failed to apply recovery batch. Modification data was written to file %s",
+ shardName, file), e);
}
transaction = null;
}
public void applyRecoverySnapshot(final byte[] snapshotBytes) {
log.debug("{}: Applying recovered snapshot", shardName);
- final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- final PruningDataTreeModification tx = new PruningDataTreeModification(store.takeSnapshot().newModification(), validNamespaces);
- tx.write(ROOT, node);
+ final ShardDataTreeSnapshot snapshot;
+ try {
+ snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
+ } catch (IOException e) {
+ log.error("{}: failed to deserialize snapshot", e);
+ throw Throwables.propagate(e);
+ }
+
+ final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
+ store.getDataTree(), store.getSchemaContext());
+
+ final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
+ tx.write(YangInstanceIdentifier.EMPTY, node);
+
try {
commitTransaction(tx);
- } catch (DataValidationFailedException e) {
- log.error("{}: Failed to apply recovery snapshot", shardName, e);
+ } catch (Exception e) {
+ File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-snapshot-" + shardName + ".xml");
+ NormalizedNodeXMLOutput.toFile(file, node);
+ throw new RuntimeException(String.format(
+ "%s: Failed to apply recovery snapshot. Node data was written to file %s",
+ shardName, file), e);
}
}
+
+ @Override
+ public byte[] getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
}