import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
+
+ private ShardSnapshot restoreFromSnapshot;
+
+
+
protected Shard(AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
this.datastoreContext = builder.getDatastoreContext();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
+ // FIXME: BUG-1014: pass down the proper TreeType
store = new ShardDataTree(builder.getSchemaContext());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
commitCoordinator = new ShardCommitCoordinator(store,
datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
- datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
+ datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
setTransactionCommitTimeout();
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+
}
private void setTransactionCommitTimeout() {
context().parent().tell(message, self());
} else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
sender().tell(getShardMBean(), self());
- } else if(message instanceof GetShardDataTree){
+ } else if(message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
+ } else if(message instanceof ServerRemoved){
+ context().parent().forward(message, context());
} else {
super.onReceiveCommand(message);
}
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ DataTreeCandidatePayload.create(candidate));
}
}
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+ restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
+ restoreFromSnapshot = null;
+
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
private SchemaContext schemaContext;
+ private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private volatile boolean sealed;
protected AbstractBuilder(Class<S> shardClass) {
return self();
}
+ public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
+ }
+
public ShardIdentifier getId() {
return id;
}
return schemaContext;
}
+ public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+ return restoreFromSnapshot;
+ }
+
protected void verify() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");