Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index d0bb3d3b69824d18b2acdb08a4defb50d31953b2..7073ea758b2b751d93708914bfc3935779076908 100644 (file)
@@ -27,9 +27,19 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -55,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -75,15 +86,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.Nonnull;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A Shard represents a portion of the logical data tree <br/>
  * <p>
@@ -104,19 +106,20 @@ public class Shard extends RaftActor {
     private final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-    // By default persistent will be true and can be turned off using the system
-    // property shard.persistent
-    private final boolean persistent;
-
     /// The name of this shard
     private final ShardIdentifier name;
 
     private final ShardStats shardMBean;
 
-    private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+    private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
+
+    private final List<DelayedListenerRegistration> delayedListenerRegistrations =
+                                                                       Lists.newArrayList();
 
     private final DatastoreContext datastoreContext;
 
+    private final DataPersistenceProvider dataPersistenceProvider;
+
     private SchemaContext schemaContext;
 
     private ActorRef createSnapshotTransaction;
@@ -129,6 +132,8 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
+    private Optional<ActorRef> roleChangeNotifier;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -137,20 +142,17 @@ public class Shard extends RaftActor {
 
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            DatastoreContext datastoreContext, SchemaContext schemaContext) {
+    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+            final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses),
                 Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
+        this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
 
-        String setting = System.getProperty("shard.persistent");
-
-        this.persistent = !"false".equals(setting);
-
-        LOG.info("Shard created : {} persistent : {}", name, persistent);
+        LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
@@ -161,7 +163,6 @@ public class Shard extends RaftActor {
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
         if (isMetricsCaptureEnabled()) {
@@ -173,10 +174,13 @@ public class Shard extends RaftActor {
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+        // create a notifier actor for each cluster member
+        roleChangeNotifier = createRoleChangeNotifier(name.toString());
     }
 
     private static Map<String, String> mapPeerAddresses(
-        Map<ShardIdentifier, String> peerAddresses) {
+        final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
 
         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
@@ -189,7 +193,7 @@ public class Shard extends RaftActor {
 
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
@@ -198,6 +202,12 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
+    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+    }
+
     @Override
     public void postStop() {
         super.postStop();
@@ -208,7 +218,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(Object message) {
+    public void onReceiveRecover(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -217,13 +227,17 @@ public class Shard extends RaftActor {
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+            // Even though recovery failed, we still need to finish our recovery, eg send the
+            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+            onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
         }
     }
 
     @Override
-    public void onReceiveCommand(Object message) {
+    public void onReceiveCommand(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
@@ -257,6 +271,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
@@ -270,7 +289,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCommitTransaction(CommitTransaction commit) {
+    private void handleCommitTransaction(final CommitTransaction commit) {
         final String transactionID = commit.getTransactionID();
 
         LOG.debug("Committing transaction {}", transactionID);
@@ -301,12 +320,8 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().preCommit().get();
 
-            if(persistent) {
-                Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
-            } else {
-                Shard.this.finishCommit(getSender(), transactionID);
-            }
+            Shard.this.persistData(getSender(), transactionID,
+                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -369,13 +384,14 @@ public class Shard extends RaftActor {
         commitCoordinator.currentTransactionComplete(transactionID, true);
     }
 
-    private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+    private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}", ready.getTransactionID());
+        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+                ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -383,19 +399,29 @@ public class Shard extends RaftActor {
         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
                 ready.getModification());
 
-        // Return our actor path as we'll handle the three phase commit.
-        ReadyTransactionReply readyTransactionReply =
-            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
-        getSender().tell(
-            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
-            getSelf());
+        // Return our actor path as we'll handle the three phase commit, except if the Tx client
+        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+        // node. In that case, the subsequent 3-phase commit messages won't contain the
+        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+        // to provide the compatible behavior.
+        ActorRef replyActorPath = self();
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                    ready.getTransactionID()));
+        }
+
+        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+                Serialization.serializedActorPath(replyActorPath));
+        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, getSelf());
     }
 
-    private void handleAbortTransaction(AbortTransaction abort) {
+    private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(String transactionID, final ActorRef sender) {
+    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -410,7 +436,7 @@ public class Shard extends RaftActor {
 
             Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void v) {
+                public void onSuccess(final Void v) {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
@@ -419,7 +445,7 @@ public class Shard extends RaftActor {
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
+                public void onFailure(final Throwable t) {
                     LOG.error(t, "An exception happened during abort");
 
                     if(sender != null) {
@@ -430,20 +456,20 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCreateTransaction(Object message) {
+    private void handleCreateTransaction(final Object message) {
         if (isLeader()) {
             createTransaction(CreateTransaction.fromSerializable(message));
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
                 "Could not find shard leader so transaction cannot be created. This typically happens" +
-                " when system is coming up or recovering and a leader is being elected. Try again" +
+                " when the system is coming up or recovering and a leader is being elected. Try again" +
                 " later.")), getSelf());
         }
     }
 
-    private void handleReadDataReply(Object message) {
+    private void handleReadDataReply(final Object message) {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
@@ -457,7 +483,7 @@ public class Shard extends RaftActor {
         getSender().tell(PoisonPill.getInstance(), self());
     }
 
-    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+    private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
 
@@ -466,10 +492,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTypedTransactionActor(
-        int transactionType,
-        ShardTransactionIdentifier transactionId,
-        String transactionChainId ) {
+    private ActorRef createTypedTransactionActor(int transactionType,
+            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -482,8 +506,8 @@ public class Shard extends RaftActor {
             }
         }
 
-        if(this.schemaContext == null){
-            throw new NullPointerException("schemaContext should not be null");
+        if(this.schemaContext == null) {
+            throw new IllegalStateException("SchemaContext is not set");
         }
 
         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
@@ -493,7 +517,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
                         schemaContext,datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -502,7 +527,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
 
         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
@@ -512,7 +538,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -521,39 +548,44 @@ public class Shard extends RaftActor {
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
-        createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+        try {
+            ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+                createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+                createTransaction.getVersion());
+
+            getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+                    createTransaction.getTransactionId()).toSerializable(), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+            String transactionChainId, int clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
-        ActorRef transactionActor =
-            createTypedTransactionActor(transactionType, transactionId, transactionChainId);
 
-        getSender()
-            .tell(new CreateTransactionReply(
-                    Serialization.serializedActorPath(transactionActor),
-                    remoteTransactionId).toSerializable(),
-                getSelf());
+        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+                transactionChainId, clientVersion);
 
         return transactionActor;
     }
 
-    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
         throws ExecutionException, InterruptedException {
         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
 
-    private void commitWithNewTransaction(Modification modification) {
+    private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
@@ -566,59 +598,66 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void updateSchemaContext(UpdateSchemaContext message) {
+    private void updateSchemaContext(final UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
-    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+    @VisibleForTesting
+    void updateSchemaContext(final SchemaContext schemaContext) {
         store.onGlobalContextUpdated(schemaContext);
     }
 
-    private void registerChangeListener(
-        RegisterChangeListener registerChangeListener) {
+    private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("registerDataChangeListener for {}", registerChangeListener
-                .getPath());
+        LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                     NormalizedNode<?, ?>>> registration;
+        if(isLeader()) {
+            registration = doChangeListenerRegistration(registerChangeListener);
+        } else {
+            LOG.debug("Shard is not the leader - delaying registration");
+
+            DelayedListenerRegistration delayedReg =
+                    new DelayedListenerRegistration(registerChangeListener);
+            delayedListenerRegistrations.add(delayedReg);
+            registration = delayedReg;
         }
 
+        ActorRef listenerRegistration = getContext().actorOf(
+                DataChangeListenerRegistration.props(registration));
 
-        ActorSelection dataChangeListenerPath = getContext()
-            .system().actorSelection(
-                registerChangeListener.getDataChangeListenerPath());
+        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+                    listenerRegistration.path());
+
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+    }
 
+    private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                               NormalizedNode<?, ?>>> doChangeListenerRegistration(
+            final RegisterChangeListener registerChangeListener) {
+
+        ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+                registerChangeListener.getDataChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
         // If this shard is the leader then it will enable notifications else
         // it will not
-        dataChangeListenerPath
-            .tell(new EnableNotification(isLeader()), getSelf());
+        dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
         dataChangeListeners.add(dataChangeListenerPath);
 
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
 
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            registration = store.registerChangeListener(registerChangeListener.getPath(),
-                listener, registerChangeListener.getScope());
-        ActorRef listenerRegistration =
-            getContext().actorOf(
-                DataChangeListenerRegistration.props(registration));
+        LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(
-                "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-                , listenerRegistration.path().toString());
-        }
-
-        getSender()
-            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
-                getSelf());
+        return store.registerChangeListener(registerChangeListener.getPath(), listener,
+                registerChangeListener.getScope());
     }
 
     private boolean isMetricsCaptureEnabled(){
@@ -628,7 +667,7 @@ public class Shard extends RaftActor {
 
     @Override
     protected
-    void startLogRecoveryBatch(int maxBatchSize) {
+    void startLogRecoveryBatch(final int maxBatchSize) {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         if(LOG.isDebugEnabled()) {
@@ -637,7 +676,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else {
@@ -646,7 +685,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(ByteString snapshot) {
+    protected void applyRecoverySnapshot(final ByteString snapshot) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
@@ -699,16 +738,19 @@ public class Shard extends RaftActor {
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
-        // Schedule a message to be periodically sent to check if the current in-progress
-        // transaction should be expired and aborted.
-        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
-        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
-                period, period, getSelf(),
-                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        // Being paranoid here - this method should only be called once but just in case...
+        if(txCommitTimeoutCheckSchedule == null) {
+            // Schedule a message to be periodically sent to check if the current in-progress
+            // transaction should be expired and aborted.
+            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                    period, period, getSelf(),
+                    TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        }
     }
 
     @Override
-    protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
@@ -756,7 +798,8 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "");
+                "createSnapshot" + ++createSnapshotTransactionCounter, "",
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
@@ -766,7 +809,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(ByteString snapshot) {
+    protected void applySnapshot(final ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -791,17 +834,28 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override protected void onStateChanged() {
+    @Override
+    protected void onStateChanged() {
+        boolean isLeader = isLeader();
         for (ActorSelection dataChangeListener : dataChangeListeners) {
-            dataChangeListener
-                .tell(new EnableNotification(isLeader()), getSelf());
+            dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+        }
+
+        if(isLeader) {
+            for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+                if(!reg.isClosed()) {
+                    reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+                }
+            }
+
+            delayedListenerRegistrations.clear();
         }
 
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
 
         // If this actor is no longer the leader close all the transaction chains
-        if(!isLeader()){
+        if(!isLeader){
             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
                 if(LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -815,7 +869,12 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
+    @Override
+    protected DataPersistenceProvider persistence() {
+        return dataPersistenceProvider;
+    }
+
+    @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.setLeader(newLeader);
     }
 
@@ -823,6 +882,11 @@ public class Shard extends RaftActor {
         return this.name.toString();
     }
 
+    @VisibleForTesting
+    DataPersistenceProvider getDataPersistenceProvider() {
+        return dataPersistenceProvider;
+    }
+
     private static class ShardCreator implements Creator<Shard> {
 
         private static final long serialVersionUID = 1L;
@@ -832,8 +896,8 @@ public class Shard extends RaftActor {
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-                DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+                final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
             this.datastoreContext = datastoreContext;
@@ -855,4 +919,45 @@ public class Shard extends RaftActor {
     ShardStats getShardMBean() {
         return shardMBean;
     }
+
+    private static class DelayedListenerRegistration implements
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+        private volatile boolean closed;
+
+        private final RegisterChangeListener registerChangeListener;
+
+        private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                             NormalizedNode<?, ?>>> delegate;
+
+        DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
+            this.registerChangeListener = registerChangeListener;
+        }
+
+        void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                            NormalizedNode<?, ?>>> registration) {
+            this.delegate = registration;
+        }
+
+        boolean isClosed() {
+            return closed;
+        }
+
+        RegisterChangeListener getRegisterChangeListener() {
+            return registerChangeListener;
+        }
+
+        @Override
+        public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+            return delegate != null ? delegate.getInstance() : null;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+            if(delegate != null) {
+                delegate.close();
+            }
+        }
+    }
 }