- LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- persistenceId(), data, data.getClass().getClassLoader(),
- CompositeModificationPayload.class.getClassLoader());
- }
-
- updateJournalStats();
-
- }
-
- private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
- if(modification == null) {
- LOG.error(
- "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
- } else if(clientActor == null) {
- // There's no clientActor to which to send a commit reply so we must be applying
- // replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
- } else {
- // This must be the OK to commit after replication consensus.
- finishCommit(clientActor, identifier);
- }
- }
-
- private void updateJournalStats() {
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-
- if (lastLogEntry != null) {
- shardMBean.setLastLogIndex(lastLogEntry.getIndex());
- shardMBean.setLastLogTerm(lastLogEntry.getTerm());
- }
-
- shardMBean.setCommitIndex(getCommitIndex());
- shardMBean.setLastApplied(getLastApplied());
- shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
- }
-
- @Override
- protected void createSnapshot() {
- // Create a transaction actor. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot. THe transaction actor will
- // after processing the CreateSnapshot message.
-
- ActorRef createSnapshotTransaction = createTransaction(
- TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "",
- DataStoreVersions.CURRENT_VERSION);
-
- createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
- }
-
- @VisibleForTesting
- @Override
- protected void applySnapshot(final byte[] snapshotBytes) {
- // Since this will be done only on Recovery or when this actor is a Follower
- // we can safely commit everything in here. We not need to worry about event notifications
- // as they would have already been disabled on the follower
-
- LOG.info("{}: Applying snapshot", persistenceId());
- try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- transaction.delete(DATASTORE_ROOT);
-
- // Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
- } finally {
- LOG.info("{}: Done applying snapshot", persistenceId());