*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.List;
+import java.net.URI;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
+import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
/**
* committed to the data store in the order the corresponding snapshot or log batch are received
* to preserve data store integrity.
*
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
*/
-class ShardRecoveryCoordinator {
-
- private final InMemoryDOMDataStore store;
- private List<ModificationPayload> currentLogRecoveryBatch;
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
+ private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+ private final ShardDataTree store;
private final String shardName;
private final Logger log;
-
- ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
- this.store = store;
+ private final Set<URI> validNamespaces;
+ private PruningDataTreeModification transaction;
+ private int size;
+ private final byte[] restoreFromSnapshot;
+
+ ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+ String shardName, Logger log) {
+ this.store = Preconditions.checkNotNull(store);
+ this.restoreFromSnapshot = restoreFromSnapshot;
this.shardName = shardName;
this.log = log;
+ this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
}
- void startLogRecoveryBatch(int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+ transaction = new PruningDataTreeModification(store.newModification(), validNamespaces);
+ size = 0;
}
- void appendRecoveredLogPayload(Payload payload) {
+ @Override
+ public void appendRecoveredLogEntry(Payload payload) {
+ Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
+
try {
- if(payload instanceof ModificationPayload) {
- currentLogRecoveryBatch.add((ModificationPayload) payload);
+ if (payload instanceof DataTreeCandidatePayload) {
+ DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+ size++;
+ } else if (payload instanceof ModificationPayload) {
+ MutableCompositeModification.fromSerializable(
+ ((ModificationPayload) payload).getModification()).apply(transaction);
+ size++;
} else if (payload instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationPayload) payload).getModification())));
+ MutableCompositeModification.fromSerializable(
+ ((CompositeModificationPayload) payload).getModification()).apply(transaction);
+ size++;
} else if (payload instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationByteStringPayload) payload).getModification())));
+ MutableCompositeModification.fromSerializable(
+ ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+ size++;
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (IOException e) {
+ } catch (IOException | ClassNotFoundException e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
-
}
- private void commitTransaction(DOMStoreWriteTransaction transaction) {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- try {
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- } catch (Exception e) {
- log.error("{}: Failed to commit Tx on recovery", shardName, e);
- }
+ private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
+ store.commit(tx.getDelegate());
}
/**
* Applies the current batched log entries to the data store.
*/
- void applyCurrentLogRecoveryBatch() {
- log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
+ @Override
+ public void applyCurrentLogRecoveryBatch() {
+ Preconditions.checkState(transaction != null, "call startLogRecovery before calling applyCurrentLogRecoveryBatch");
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- for(ModificationPayload payload: currentLogRecoveryBatch) {
- try {
- MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
- } catch (Exception e) {
- log.error("{}: Error extracting ModificationPayload", shardName, e);
- }
+ log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
+ try {
+ commitTransaction(transaction);
+ } catch (DataValidationFailedException e) {
+ log.error("{}: Failed to apply recovery batch", shardName, e);
}
-
- commitTransaction(writeTx);
-
- currentLogRecoveryBatch = null;
+ transaction = null;
}
/**
*
* @param snapshotBytes the serialized snapshot
*/
- void applyRecoveredSnapshot(final byte[] snapshotBytes) {
- log.debug("{}: Applyng recovered sbapshot", shardName);
-
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ @Override
+ public void applyRecoverySnapshot(final byte[] snapshotBytes) {
+ log.debug("{}: Applying recovered snapshot", shardName);
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- writeTx.write(YangInstanceIdentifier.builder().build(), node);
+ final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+ final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(), validNamespaces);
+ tx.write(ROOT, node);
+ try {
+ commitTransaction(tx);
+ } catch (DataValidationFailedException e) {
+ log.error("{}: Failed to apply recovery snapshot", shardName, e);
+ }
+ }
- commitTransaction(writeTx);
+ @Override
+ public byte[] getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
}
}