2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Throwables;
15 import com.google.common.io.ByteSource;
16 import java.io.IOException;
17 import java.io.OutputStream;
18 import java.io.Serializable;
19 import java.util.HashMap;
21 import javax.annotation.Nonnull;
22 import org.apache.commons.lang3.SerializationUtils;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
36 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
41 * A sample actor showing how the RaftActor is to be extended
43 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
44 private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
45 private static final long serialVersionUID = 1L;
47 PayloadIdentifier(final long identifier) {
48 super(String.valueOf(identifier));
52 private final Map<String, String> state = new HashMap<>();
54 private long persistIdentifier = 1;
55 private final Optional<ActorRef> roleChangeNotifier;
58 public ExampleActor(String id, Map<String, String> peerAddresses,
59 Optional<ConfigParams> configParams) {
60 super(id, peerAddresses, configParams, (short)0);
62 roleChangeNotifier = createRoleChangeNotifier(id);
65 public static Props props(final String id, final Map<String, String> peerAddresses,
66 final Optional<ConfigParams> configParams) {
67 return Props.create(ExampleActor.class, id, peerAddresses, configParams);
71 protected void handleNonRaftCommand(Object message) {
72 if(message instanceof KeyValue){
74 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
76 if(getLeader() != null) {
77 getLeader().forward(message, getContext());
81 } else if (message instanceof PrintState) {
82 if(LOG.isDebugEnabled()) {
83 LOG.debug("State of the node:{} has entries={}, {}",
84 getId(), state.size(), getReplicatedLogState());
87 } else if (message instanceof PrintRole) {
88 if(LOG.isDebugEnabled()) {
89 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
90 final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
91 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
92 getRaftActorContext().getPeerIds(), followers);
94 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
95 getRaftActorContext().getPeerIds());
102 super.handleNonRaftCommand(message);
106 protected String getReplicatedLogState() {
107 return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
108 + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
109 + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
112 public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
113 ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
114 RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
115 return Optional.<ActorRef>of(exampleRoleChangeNotifier);
119 protected Optional<ActorRef> getRoleChangeNotifier() {
120 return roleChangeNotifier;
124 protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
125 if(data instanceof KeyValue){
126 KeyValue kv = (KeyValue) data;
127 state.put(kv.getKey(), kv.getValue());
128 if(clientActor != null) {
129 clientActor.tell(new KeyValueSaved(), getSelf());
135 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
137 if (installSnapshotStream.isPresent()) {
138 SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
140 } catch (Exception e) {
141 LOG.error("Exception in creating snapshot", e);
144 getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
148 public void applySnapshot(Snapshot.State snapshotState) {
151 state.putAll(((MapState)snapshotState).state);
152 } catch (Exception e) {
153 LOG.error("Exception in applying snapshot", e);
155 if(LOG.isDebugEnabled()) {
156 LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
160 @Override protected void onStateChanged() {
164 @Override public String persistenceId() {
170 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
175 public void startLogRecoveryBatch(int maxBatchSize) {
179 public void appendRecoveredLogEntry(Payload data) {
183 public void applyCurrentLogRecoveryBatch() {
187 public void onRecoveryComplete() {
191 public void applyRecoverySnapshot(Snapshot.State snapshotState) {
195 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
200 public byte[] getRestoreFromSnapshot() {
205 public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
207 return deserializePreCarbonSnapshot(snapshotBytes.read());
208 } catch (IOException e) {
209 throw Throwables.propagate(e);
213 @SuppressWarnings("unchecked")
215 public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
216 return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
219 private static class MapState implements Snapshot.State {
220 private static final long serialVersionUID = 1L;
222 Map<String, String> state;
224 MapState(Map<String, String> state) {