*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
-import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
*
* @author Thomas Pantelis
*/
-class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
- private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+abstract class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
+ private static final class Simple extends ShardRecoveryCoordinator {
+ Simple(final ShardDataTree store, final String shardName, final Logger log) {
+ super(store, shardName, log);
+ }
+
+ @Override
+ public Snapshot getRestoreFromSnapshot() {
+ return null;
+ }
+ }
+
+ private static final class WithSnapshot extends ShardRecoveryCoordinator {
+ private final Snapshot restoreFromSnapshot;
+
+ WithSnapshot(final ShardDataTree store, final String shardName, final Logger log, final Snapshot snapshot) {
+ super(store, shardName, log);
+ restoreFromSnapshot = requireNonNull(snapshot);
+ }
+
+ @Override
+ public Snapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+ }
+
private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private final Set<URI> validNamespaces;
- private PruningDataTreeModification transaction;
- private int size;
-
- ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
- this.store = Preconditions.checkNotNull(store);
- this.shardName = shardName;
- this.log = log;
- this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
+
+ private boolean open;
+
+ ShardRecoveryCoordinator(final ShardDataTree store, final String shardName, final Logger log) {
+ this.store = requireNonNull(store);
+ this.shardName = requireNonNull(shardName);
+ this.log = requireNonNull(log);
+ }
+
+ static ShardRecoveryCoordinator create(final ShardDataTree store, final String shardName, final Logger log) {
+ return new Simple(store, shardName, log);
+ }
+
+ static ShardRecoveryCoordinator forSnapshot(final ShardDataTree store, final String shardName, final Logger log,
+ final Snapshot snapshot) {
+ return new WithSnapshot(store, shardName, log, snapshot);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
- transaction = new PruningDataTreeModification(store.newModification(), validNamespaces);
- size = 0;
+ open = true;
}
@Override
- public void appendRecoveredLogEntry(Payload payload) {
- Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void appendRecoveredLogEntry(final Payload payload) {
+ checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
try {
- if (payload instanceof DataTreeCandidatePayload) {
- DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
- size++;
- } else if (payload instanceof ModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((ModificationPayload) payload).getModification()).apply(transaction);
- size++;
- } else if (payload instanceof CompositeModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((CompositeModificationPayload) payload).getModification()).apply(transaction);
- size++;
- } else if (payload instanceof CompositeModificationByteStringPayload) {
- MutableCompositeModification.fromSerializable(
- ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
- size++;
- } else {
- log.error("{}: Unknown payload {} received during recovery", shardName, payload);
- }
- } catch (IOException | ClassNotFoundException e) {
- log.error("{}: Error extracting ModificationPayload", shardName, e);
+ store.applyRecoveryPayload(payload);
+ } catch (Exception e) {
+ log.error("{}: failed to apply payload {}", shardName, payload, e);
+ throw new IllegalStateException(String.format("%s: Failed to apply recovery payload %s",
+ shardName, payload), e);
}
}
- private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
- store.commit(tx.getDelegate());
- }
-
/**
* Applies the current batched log entries to the data store.
*/
@Override
public void applyCurrentLogRecoveryBatch() {
- Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+ checkState(open, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
+ open = false;
+ }
- log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
- try {
- commitTransaction(transaction);
- } catch (DataValidationFailedException e) {
- log.error("{}: Failed to apply recovery batch", shardName, e);
- }
- transaction = null;
+ private File writeRoot(final String kind, final NormalizedNode node) {
+ final File file = new File(System.getProperty("karaf.data", "."),
+ "failed-recovery-" + kind + "-" + shardName + ".xml");
+ NormalizedNodeXMLOutput.toFile(file, node);
+ return file;
}
/**
* Applies a recovered snapshot to the data store.
*
- * @param snapshotBytes the serialized snapshot
+ * @param snapshotState the serialized snapshot
*/
@Override
- public void applyRecoverySnapshot(final byte[] snapshotBytes) {
- log.debug("{}: Applying recovered snapshot", shardName);
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
+ if (!(snapshotState instanceof ShardSnapshotState)) {
+ log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", shardName, snapshotState);
+ return;
+ }
- final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(), validNamespaces);
- tx.write(ROOT, node);
+ log.debug("{}: Applying recovered snapshot", shardName);
+ final ShardSnapshotState shardSnapshotState = (ShardSnapshotState)snapshotState;
try {
- commitTransaction(tx);
- } catch (DataValidationFailedException e) {
- log.error("{}: Failed to apply recovery snapshot", shardName, e);
+ store.applyRecoverySnapshot(shardSnapshotState);
+ } catch (Exception e) {
+ final ShardDataTreeSnapshot shardSnapshot = shardSnapshotState.getSnapshot();
+ final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null));
+ throw new IllegalStateException(String.format(
+ "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
+ shardName, shardSnapshot, f), e);
}
}
-
- @Override
- public byte[] getRestoreFromSnapshot() {
- // TODO Auto-generated method stub
- return null;
- }
}