2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 import org.opendaylight.yangtools.concepts.Identifier;
37 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
40 * A sample actor showing how the RaftActor is to be extended
42 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
43 private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
44 private static final long serialVersionUID = 1L;
46 PayloadIdentifier(final long identifier) {
47 super(String.valueOf(identifier));
51 private final Map<String, String> state = new HashMap<>();
53 private long persistIdentifier = 1;
54 private final Optional<ActorRef> roleChangeNotifier;
57 public ExampleActor(String id, Map<String, String> peerAddresses,
58 Optional<ConfigParams> configParams) {
59 super(id, peerAddresses, configParams, (short)0);
61 roleChangeNotifier = createRoleChangeNotifier(id);
64 public static Props props(final String id, final Map<String, String> peerAddresses,
65 final Optional<ConfigParams> configParams) {
66 return Props.create(ExampleActor.class, id, peerAddresses, configParams);
70 protected void handleNonRaftCommand(Object message) {
71 if(message instanceof KeyValue){
73 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message);
75 if(getLeader() != null) {
76 getLeader().forward(message, getContext());
80 } else if (message instanceof PrintState) {
81 if(LOG.isDebugEnabled()) {
82 LOG.debug("State of the node:{} has entries={}, {}",
83 getId(), state.size(), getReplicatedLogState());
86 } else if (message instanceof PrintRole) {
87 if(LOG.isDebugEnabled()) {
88 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
89 final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
90 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
91 getRaftActorContext().getPeerIds(), followers);
93 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
94 getRaftActorContext().getPeerIds());
101 super.handleNonRaftCommand(message);
105 protected String getReplicatedLogState() {
106 return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
107 + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
108 + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
111 public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
112 ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
113 RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
114 return Optional.<ActorRef>of(exampleRoleChangeNotifier);
118 protected Optional<ActorRef> getRoleChangeNotifier() {
119 return roleChangeNotifier;
123 protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
124 if(data instanceof KeyValue){
125 KeyValue kv = (KeyValue) data;
126 state.put(kv.getKey(), kv.getValue());
127 if(clientActor != null) {
128 clientActor.tell(new KeyValueSaved(), getSelf());
134 public void createSnapshot(ActorRef actorRef) {
135 ByteString bs = null;
137 bs = fromObject(state);
138 } catch (Exception e) {
139 LOG.error("Exception in creating snapshot", e);
141 getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
145 public void applySnapshot(byte [] snapshot) {
148 state.putAll((HashMap<String, String>) toObject(snapshot));
149 } catch (Exception e) {
150 LOG.error("Exception in applying snapshot", e);
152 if(LOG.isDebugEnabled()) {
153 LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
157 private static ByteString fromObject(Object snapshot) throws Exception {
158 ByteArrayOutputStream b = null;
159 ObjectOutputStream o = null;
161 b = new ByteArrayOutputStream();
162 o = new ObjectOutputStream(b);
163 o.writeObject(snapshot);
164 byte[] snapshotBytes = b.toByteArray();
165 return ByteString.copyFrom(snapshotBytes);
177 private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
179 ByteArrayInputStream bis = null;
180 ObjectInputStream ois = null;
182 bis = new ByteArrayInputStream(bs);
183 ois = new ObjectInputStream(bis);
184 obj = ois.readObject();
196 @Override protected void onStateChanged() {
200 @Override public String persistenceId() {
206 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
211 public void startLogRecoveryBatch(int maxBatchSize) {
215 public void appendRecoveredLogEntry(Payload data) {
219 public void applyCurrentLogRecoveryBatch() {
223 public void onRecoveryComplete() {
227 public void applyRecoverySnapshot(byte[] snapshot) {
231 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
236 public byte[] getRestoreFromSnapshot() {