Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Bug 1435: CDS: Added support for custom commit cohort.
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
ShardRecoveryCoordinator.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
index f8c1db987912e44e487088abb9dd3d903b0c8fee..776ab276c612dfd131e8ade7a598123b26c5c7b1 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
@@
-9,11
+9,7
@@
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
import java.io.IOException;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.net.URI;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
@@
-37,25
+33,27
@@
import org.slf4j.Logger;
* @author Thomas Pantelis
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
* @author Thomas Pantelis
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
- private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
private final ShardDataTree store;
private final String shardName;
private final Logger log;
private final ShardDataTree store;
private final String shardName;
private final Logger log;
- private final S
et<URI> validNamespaces
;
+ private final S
chemaContext schemaContext
;
private PruningDataTreeModification transaction;
private int size;
private PruningDataTreeModification transaction;
private int size;
+ private final byte[] restoreFromSnapshot;
- ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
+ ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+ String shardName, Logger log) {
this.store = Preconditions.checkNotNull(store);
this.store = Preconditions.checkNotNull(store);
+ this.restoreFromSnapshot = restoreFromSnapshot;
this.shardName = shardName;
this.log = log;
this.shardName = shardName;
this.log = log;
- this.
validNamespaces = NormalizedNodePruner.namespaces(schemaContext)
;
+ this.
schemaContext = schemaContext
;
}
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
- transaction = new PruningDataTreeModification(store.newModification(),
validNamespaces
);
+ transaction = new PruningDataTreeModification(store.newModification(),
store.getDataTree(), schemaContext
);
size = 0;
}
size = 0;
}
@@
-67,10
+65,6
@@
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
if (payload instanceof DataTreeCandidatePayload) {
DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
size++;
if (payload instanceof DataTreeCandidatePayload) {
DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
size++;
- } else if (payload instanceof ModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((ModificationPayload) payload).getModification()).apply(transaction);
- size++;
} else if (payload instanceof CompositeModificationPayload) {
MutableCompositeModification.fromSerializable(
((CompositeModificationPayload) payload).getModification()).apply(transaction);
} else if (payload instanceof CompositeModificationPayload) {
MutableCompositeModification.fromSerializable(
((CompositeModificationPayload) payload).getModification()).apply(transaction);
@@
-82,13
+76,13
@@
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (IOException
| ClassNotFoundException
e) {
+ } catch (IOException e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
private void commitTransaction(PruningDataTreeModification tx) throws DataValidationFailedException {
- store.commit(tx.get
Delegate
());
+ store.commit(tx.get
ResultingModification
());
}
/**
}
/**
@@
-117,12
+111,18
@@
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
log.debug("{}: Applying recovered snapshot", shardName);
final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
log.debug("{}: Applying recovered snapshot", shardName);
final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(), validNamespaces);
- tx.write(ROOT, node);
+ final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
+ store.getDataTree(), schemaContext);
+ tx.write(YangInstanceIdentifier.EMPTY, node);
try {
commitTransaction(tx);
} catch (DataValidationFailedException e) {
log.error("{}: Failed to apply recovery snapshot", shardName, e);
}
}
try {
commitTransaction(tx);
} catch (DataValidationFailedException e) {
log.error("{}: Failed to apply recovery snapshot", shardName, e);
}
}
+
+ @Override
+ public byte[] getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
}
}