2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.example;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.OutputStream;
15 import java.io.Serializable;
16 import java.util.HashMap;
18 import java.util.Optional;
19 import org.apache.commons.lang3.SerializationUtils;
20 import org.opendaylight.controller.cluster.example.messages.KeyValue;
21 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
22 import org.opendaylight.controller.cluster.example.messages.PrintRole;
23 import org.opendaylight.controller.cluster.example.messages.PrintState;
24 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
25 import org.opendaylight.controller.cluster.raft.ConfigParams;
26 import org.opendaylight.controller.cluster.raft.RaftActor;
27 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
28 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
32 import org.opendaylight.controller.cluster.raft.messages.Payload;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.yangtools.concepts.Identifier;
35 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
38 * A sample actor showing how the RaftActor is to be extended.
40 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
41 private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
42 private static final long serialVersionUID = 1L;
44 PayloadIdentifier(final long identifier) {
45 super(String.valueOf(identifier));
49 private final Map<String, String> state = new HashMap<>();
50 private final Optional<ActorRef> roleChangeNotifier;
52 private long persistIdentifier = 1;
54 public ExampleActor(final String id, final Map<String, String> peerAddresses,
55 final Optional<ConfigParams> configParams) {
56 super(id, peerAddresses, configParams, (short)0);
58 roleChangeNotifier = createRoleChangeNotifier(id);
61 public static Props props(final String id, final Map<String, String> peerAddresses,
62 final Optional<ConfigParams> configParams) {
63 return Props.create(ExampleActor.class, id, peerAddresses, configParams);
67 protected void handleNonRaftCommand(final Object message) {
68 if (message instanceof KeyValue) {
70 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
71 } else if (getLeader() != null) {
72 getLeader().forward(message, getContext());
75 } else if (message instanceof PrintState) {
76 if (LOG.isDebugEnabled()) {
77 LOG.debug("State of the node:{} has entries={}, {}",
78 getId(), state.size(), getReplicatedLogState());
81 } else if (message instanceof PrintRole) {
82 if (LOG.isDebugEnabled()) {
83 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
84 final String followers = ((Leader)getCurrentBehavior()).printFollowerStates();
85 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
86 getRaftActorContext().getPeerIds(), followers);
88 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
89 getRaftActorContext().getPeerIds());
96 super.handleNonRaftCommand(message);
100 protected String getReplicatedLogState() {
101 return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
102 + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
103 + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
106 public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
107 ActorRef exampleRoleChangeNotifier = getContext().actorOf(
108 RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
109 return Optional.<ActorRef>of(exampleRoleChangeNotifier);
113 protected Optional<ActorRef> getRoleChangeNotifier() {
114 return roleChangeNotifier;
118 protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
119 if (data instanceof KeyValue kv) {
120 state.put(kv.getKey(), kv.getValue());
121 if (clientActor != null) {
122 clientActor.tell(new KeyValueSaved(), getSelf());
128 @SuppressWarnings("checkstyle:IllegalCatch")
129 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
131 if (installSnapshotStream.isPresent()) {
132 SerializationUtils.serialize((Serializable) state, installSnapshotStream.orElseThrow());
134 } catch (RuntimeException e) {
135 LOG.error("Exception in creating snapshot", e);
138 getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
142 public void applySnapshot(final Snapshot.State snapshotState) {
144 state.putAll(((MapState)snapshotState).state);
146 if (LOG.isDebugEnabled()) {
147 LOG.debug("Snapshot applied to state : {}", state.size());
152 protected void onStateChanged() {
157 public String persistenceId() {
162 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
167 public void startLogRecoveryBatch(final int maxBatchSize) {
171 public void appendRecoveredLogEntry(final Payload data) {
175 public void applyCurrentLogRecoveryBatch() {
179 public void onRecoveryComplete() {
183 public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
187 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
192 public Snapshot getRestoreFromSnapshot() {
196 @SuppressWarnings("unchecked")
198 public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
200 return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
201 } catch (IOException e) {
202 throw new IllegalStateException(e);
206 private static class MapState implements Snapshot.State {
207 private static final long serialVersionUID = 1L;
209 Map<String, String> state;
211 MapState(final Map<String, String> state) {