X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardRecoveryCoordinator.java;h=f9d305001567ad4b47706169de873ffafb36fb47;hp=01a124b6977c801e3f273c57341efe91d97c52b2;hb=56c1339ee7dbd85bc567fc44f21ecfd322c9e803;hpb=5de57714fa057ac80f930c2fcce2758ca0a5f514 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 01a124b697..f9d3050015 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -17,11 +17,12 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; /** @@ -31,16 +32,16 @@ import org.slf4j.Logger; * committed to the data store in the order the corresponding snapshot or log batch are received * to preserve data store integrity. * - * @author Thomas Panetelis + * @author Thomas Pantelis */ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { - - private final InMemoryDOMDataStore store; + private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build(); + private final ShardDataTree store; private List currentLogRecoveryBatch; private final String shardName; private final Logger log; - ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) { + ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) { this.store = store; this.shardName = shardName; this.log = log; @@ -73,8 +74,8 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { } - private void commitTransaction(DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); try { commitCohort.preCommit().get(); commitCohort.commit().get(); @@ -90,10 +91,11 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public void applyCurrentLogRecoveryBatch() { log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); - DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); - for(ModificationPayload payload: currentLogRecoveryBatch) { + ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null); + DataTreeModification snapshot = writeTx.getSnapshot(); + for (ModificationPayload payload : currentLogRecoveryBatch) { try { - MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx); + MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot); } catch (Exception e) { log.error("{}: Error extracting ModificationPayload", shardName, e); } @@ -111,14 +113,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override public void applyRecoverySnapshot(final byte[] snapshotBytes) { - log.debug("{}: Applyng recovered sbapshot", shardName); + log.debug("{}: Applying recovered snapshot", shardName); - DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); + // Intentionally bypass normal transaction to side-step persistence/replication + final DataTree tree = store.getDataTree(); + DataTreeModification writeTx = tree.takeSnapshot().newModification(); NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - writeTx.write(YangInstanceIdentifier.builder().build(), node); - - commitTransaction(writeTx); + writeTx.write(ROOT, node); + writeTx.ready(); + try { + tree.validate(writeTx); + tree.commit(tree.prepare(writeTx)); + } catch (DataValidationFailedException e) { + log.error("{}: Failed to validate recovery snapshot", shardName, e); + } } }