import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.slf4j.Logger;
/**
* committed to the data store in the order the corresponding snapshot or log batch are received
* to preserve data store integrity.
*
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-
- private final InMemoryDOMDataStore store;
+ private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+ private final ShardDataTree store;
private List<ModificationPayload> currentLogRecoveryBatch;
private final String shardName;
private final Logger log;
- ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+ ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
this.store = store;
this.shardName = shardName;
this.log = log;
}
- private void commitTransaction(DOMStoreWriteTransaction transaction) {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
try {
commitCohort.preCommit().get();
commitCohort.commit().get();
public void applyCurrentLogRecoveryBatch() {
log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- for(ModificationPayload payload: currentLogRecoveryBatch) {
+ ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
+ DataTreeModification snapshot = writeTx.getSnapshot();
+ for (ModificationPayload payload : currentLogRecoveryBatch) {
try {
- MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+ MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
} catch (Exception e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
*/
@Override
public void applyRecoverySnapshot(final byte[] snapshotBytes) {
- log.debug("{}: Applyng recovered sbapshot", shardName);
+ log.debug("{}: Applying recovered snapshot", shardName);
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ // Intentionally bypass normal transaction to side-step persistence/replication
+ final DataTree tree = store.getDataTree();
+ DataTreeModification writeTx = tree.takeSnapshot().newModification();
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- writeTx.write(YangInstanceIdentifier.builder().build(), node);
-
- commitTransaction(writeTx);
+ writeTx.write(ROOT, node);
+ writeTx.ready();
+ try {
+ tree.validate(writeTx);
+ tree.commit(tree.prepare(writeTx));
+ } catch (DataValidationFailedException e) {
+ log.error("{}: Failed to validate recovery snapshot", shardName, e);
+ }
}
}