applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof ReplicatedLogEntry) {
- replicatedLog.append((ReplicatedLogEntry) message);
+ ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+ // Apply State immediately
+ replicatedLog.append(logEntry);
+ applyState(null, "recovery", logEntry.getData());
+ context.setLastApplied(logEntry.getIndex());
+ context.setCommitIndex(logEntry.getIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
} else if (message instanceof RecoveryCompleted) {
LOG.debug(
"RecoveryCompleted - Switching actor to Follower - " +
- "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
createTransaction(CreateTransaction.fromSerializable(message));
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
+ } else {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ "Could not find leader so transaction cannot be created")), getSelf());
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
modification);
DOMStoreWriteTransaction transaction =
store.newWriteOnlyTransaction();
+
+ LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+
modification.apply(transaction);
try {
syncCommitTransaction(transaction);
return;
}
+
+ if(sender == null){
+ LOG.error("Commit failed. Sender cannot be null");
+ return;
+ }
+
final ListenableFuture<Void> future = cohort.commit();
final ActorRef self = getSelf();