+ @Override
+ protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+
+ if(data instanceof ModificationPayload) {
+ try {
+ applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
+ }
+ }
+ else if (data instanceof CompositeModificationPayload) {
+ Object modification = ((CompositeModificationPayload) data).getModification();
+
+ applyModificationToState(clientActor, identifier, modification);
+ } else if(data instanceof CompositeModificationByteStringPayload ){
+ Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+ applyModificationToState(clientActor, identifier, modification);
+ } else {
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
+ CompositeModificationPayload.class.getClassLoader());
+ }
+
+ updateJournalStats();
+
+ }
+
+ private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ if(modification == null) {
+ LOG.error(
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
+ } else if(clientActor == null) {
+ // There's no clientActor to which to send a commit reply so we must be applying
+ // replicated state from the leader.
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+ } else {
+ // This must be the OK to commit after replication consensus.
+ finishCommit(clientActor, identifier);
+ }
+ }
+
+ private void updateJournalStats() {
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+
+ if (lastLogEntry != null) {
+ shardMBean.setLastLogIndex(lastLogEntry.getIndex());
+ shardMBean.setLastLogTerm(lastLogEntry.getTerm());
+ }
+
+ shardMBean.setCommitIndex(getCommitIndex());
+ shardMBean.setLastApplied(getLastApplied());
+ shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
+ }
+
+ @Override
+ protected void createSnapshot() {
+ // Create a transaction actor. We are really going to treat the transaction as a worker
+ // so that this actor does not get block building the snapshot. THe transaction actor will
+ // after processing the CreateSnapshot message.
+
+ ActorRef createSnapshotTransaction = createTransaction(
+ TransactionProxy.TransactionType.READ_ONLY.ordinal(),
+ "createSnapshot" + ++createSnapshotTransactionCounter, "",
+ DataStoreVersions.CURRENT_VERSION);
+
+ createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
+ }
+
+ @VisibleForTesting
+ @Override
+ protected void applySnapshot(final byte[] snapshotBytes) {
+ // Since this will be done only on Recovery or when this actor is a Follower
+ // we can safely commit everything in here. We not need to worry about event notifications
+ // as they would have already been disabled on the follower
+
+ LOG.info("{}: Applying snapshot", persistenceId());
+ try {
+ DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+ // delete everything first
+ transaction.delete(DATASTORE_ROOT);
+
+ // Add everything from the remote node back
+ transaction.write(DATASTORE_ROOT, node);
+ syncCommitTransaction(transaction);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
+ } finally {
+ LOG.info("{}: Done applying snapshot", persistenceId());
+ }
+ }
+
+ @Override
+ protected void onStateChanged() {
+ boolean isLeader = isLeader();
+ for (ActorSelection dataChangeListener : dataChangeListeners) {
+ dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+ }
+
+ if(isLeader) {
+ for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+ if(!reg.isClosed()) {
+ reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+ }
+ }
+
+ delayedListenerRegistrations.clear();
+ }
+
+ shardMBean.setRaftState(getRaftState().name());
+ shardMBean.setCurrentTerm(getCurrentTerm());
+
+ // If this actor is no longer the leader close all the transaction chains
+ if(!isLeader){
+ for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());
+ }
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
+ }
+
+ @Override
+ protected DataPersistenceProvider persistence() {
+ return dataPersistenceProvider;
+ }
+
+ @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+ shardMBean.setLeader(newLeader);