BUG-4626: Introduce NormalizedNodeData{Input,Output}
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index ee83ce2513dc1f9c3c8582820254ebdc3c5957c6..0bb886a12defb5a70df4f39b34866f4a2cc3890f 100644 (file)
@@ -39,6 +39,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
@@ -61,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -114,17 +117,24 @@ public class Shard extends RaftActor {
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
+
+    private ShardSnapshot restoreFromSnapshot;
+
+
+
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
+        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
         setPersistence(datastoreContext.isPersistent());
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
+        // FIXME: BUG-1014: pass down the proper TreeType
         store = new ShardDataTree(builder.getSchemaContext());
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
@@ -137,7 +147,7 @@ public class Shard extends RaftActor {
 
         commitCoordinator = new ShardCommitCoordinator(store,
                 datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
-                datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
+                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
 
         setTransactionCommitTimeout();
 
@@ -154,6 +164,7 @@ public class Shard extends RaftActor {
         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
 
 
+
     }
 
     private void setTransactionCommitTimeout() {
@@ -251,8 +262,10 @@ public class Shard extends RaftActor {
                 context().parent().tell(message, self());
             } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
                 sender().tell(getShardMBean(), self());
-            } else if(message instanceof GetShardDataTree){
+            } else if(message instanceof GetShardDataTree) {
                 sender().tell(store.getDataTree(), self());
+            } else if(message instanceof ServerRemoved){
+                context().parent().forward(message, context());
             } else {
                 super.onReceiveCommand(message);
             }
@@ -325,7 +338,7 @@ public class Shard extends RaftActor {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                DataTreeCandidatePayload.create(candidate));
+                    DataTreeCandidatePayload.create(candidate));
         }
     }
 
@@ -585,11 +598,14 @@ public class Shard extends RaftActor {
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-        return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+        return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+                restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
     }
 
     @Override
     protected void onRecoveryComplete() {
+        restoreFromSnapshot = null;
+
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
@@ -712,6 +728,7 @@ public class Shard extends RaftActor {
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
         private SchemaContext schemaContext;
+        private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private volatile boolean sealed;
 
         protected AbstractBuilder(Class<S> shardClass) {
@@ -751,6 +768,12 @@ public class Shard extends RaftActor {
             return self();
         }
 
+        public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return self();
+        }
+
         public ShardIdentifier getId() {
             return id;
         }
@@ -767,6 +790,10 @@ public class Shard extends RaftActor {
             return schemaContext;
         }
 
+        public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+            return restoreFromSnapshot;
+        }
+
         protected void verify() {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");