+ public java.util.Set<String> getPeers() {
+ return context.getPeerAddresses().keySet();
+ }
+
+ protected String getReplicatedLogState() {
+ return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
+ + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
+ + ", im-mem journal size=" + context.getReplicatedLog().size();
+ }
+
+
+ /**
+ * When a derived RaftActor needs to persist something it must call
+ * persistData.
+ *
+ * @param clientActor
+ * @param identifier
+ * @param data
+ */
+ protected void persistData(ActorRef clientActor, String identifier,
+ Payload data) {
+
+ ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+ context.getReplicatedLog().lastIndex() + 1,
+ context.getTermInformation().getCurrentTerm(), data);
+
+ LOG.debug("Persist data {}", replicatedLogEntry);
+
+ replicatedLog
+ .appendAndPersist(clientActor, identifier, replicatedLogEntry);
+ }
+
+ protected String getId() {
+ return context.getId();
+ }
+
+ /**
+ * Derived actors can call the isLeader method to check if the current
+ * RaftActor is the Leader or not
+ *
+ * @return true it this RaftActor is a Leader false otherwise
+ */
+ protected boolean isLeader() {
+ return context.getId().equals(currentBehavior.getLeaderId());
+ }
+
+ /**
+ * Derived actor can call getLeader if they need a reference to the Leader.
+ * This would be useful for example in forwarding a request to an actor
+ * which is the leader
+ *
+ * @return A reference to the leader if known, null otherwise
+ */
+ protected ActorSelection getLeader(){
+ String leaderId = currentBehavior.getLeaderId();
+ if (leaderId == null) {
+ return null;
+ }
+ String peerAddress = context.getPeerAddress(leaderId);
+ LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ return context.actorSelection(peerAddress);
+ }
+
+ protected RaftState getRaftState() {
+ return currentBehavior.state();
+ }
+
+ /**
+ * setPeerAddress sets the address of a known peer at a later time.
+ * <p>
+ * This is to account for situations where a we know that a peer
+ * exists but we do not know an address up-front. This may also be used in
+ * situations where a known peer starts off in a different location and we
+ * need to change it's address
+ * <p>
+ * Note that if the peerId does not match the list of peers passed to
+ * this actor during construction an IllegalStateException will be thrown.
+ *
+ * @param peerId
+ * @param peerAddress
+ */
+ protected void setPeerAddress(String peerId, String peerAddress){
+ context.setPeerAddress(peerId, peerAddress);
+ }
+
+
+
+ /**
+ * The applyState method will be called by the RaftActor when some data
+ * needs to be applied to the actor's state
+ *
+ * @param clientActor A reference to the client who sent this message. This
+ * is the same reference that was passed to persistData
+ * by the derived actor. clientActor may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery.
+ * @param identifier The identifier of the persisted data. This is also
+ * the same identifier that was passed to persistData by
+ * the derived actor. identifier may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery
+ * @param data A piece of data that was persisted by the persistData call.
+ * This should NEVER be null.
+ */
+ protected abstract void applyState(ActorRef clientActor, String identifier,
+ Object data);
+
+ /**
+ * This method will be called by the RaftActor when a snapshot needs to be
+ * created. The derived actor should respond with its current state.
+ * <p/>
+ * During recovery the state that is returned by the derived actor will
+ * be passed back to it by calling the applySnapshot method
+ *
+ * @return The current state of the actor
+ */
+ protected abstract Object createSnapshot();
+
+ /**
+ * This method will be called by the RaftActor during recovery to
+ * reconstruct the state of the actor.
+ * <p/>
+ * This method may also be called at any other point during normal
+ * operations when the derived actor is out of sync with it's peers
+ * and the only way to bring it in sync is by applying a snapshot
+ *
+ * @param snapshot A snapshot of the state of the actor
+ */
+ protected abstract void applySnapshot(Object snapshot);
+
+ /**
+ * This method will be called by the RaftActor when the state of the
+ * RaftActor changes. The derived actor can then use methods like
+ * isLeader or getLeader to do something useful
+ */
+ protected abstract void onStateChanged();
+
+ private RaftActorBehavior switchBehavior(RaftState state) {
+ if (currentBehavior != null) {
+ if (currentBehavior.state() == state) {
+ return currentBehavior;
+ }
+ LOG.info("Switching from state " + currentBehavior.state() + " to "
+ + state);
+
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.error(e,
+ "Failed to close behavior : " + currentBehavior.state());
+ }
+
+ } else {
+ LOG.info("Switching behavior to " + state);
+ }