2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
29 import org.opendaylight.controller.cluster.raft.ConfigParams;
30 import org.opendaylight.controller.cluster.raft.RaftActor;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
33 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
37 * A sample actor showing how the RaftActor is to be extended
39 public class ExampleActor extends RaftActor {
41 private final Map<String, String> state = new HashMap();
42 private final DataPersistenceProvider dataPersistenceProvider;
44 private long persistIdentifier = 1;
45 private Optional<ActorRef> roleChangeNotifier;
48 public ExampleActor(String id, Map<String, String> peerAddresses,
49 Optional<ConfigParams> configParams) {
50 super(id, peerAddresses, configParams);
51 this.dataPersistenceProvider = new PersistentDataProvider();
52 roleChangeNotifier = createRoleChangeNotifier(id);
55 public static Props props(final String id, final Map<String, String> peerAddresses,
56 final Optional<ConfigParams> configParams){
57 return Props.create(new Creator<ExampleActor>(){
59 @Override public ExampleActor create() throws Exception {
60 return new ExampleActor(id, peerAddresses, configParams);
65 @Override public void onReceiveCommand(Object message) throws Exception{
66 if(message instanceof KeyValue){
68 String persistId = Long.toString(persistIdentifier++);
69 persistData(getSender(), persistId, (Payload) message);
71 if(getLeader() != null) {
72 getLeader().forward(message, getContext());
76 } else if (message instanceof PrintState) {
77 if(LOG.isDebugEnabled()) {
78 LOG.debug("State of the node:{} has entries={}, {}",
79 getId(), state.size(), getReplicatedLogState());
82 } else if (message instanceof PrintRole) {
83 if(LOG.isDebugEnabled()) {
84 String followers = "";
85 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
86 followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
87 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
88 getRaftActorContext().getPeerAddresses().keySet(), followers);
90 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
91 getRaftActorContext().getPeerAddresses().keySet());
98 super.onReceiveCommand(message);
102 protected String getReplicatedLogState() {
103 return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
104 + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
105 + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
108 public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
109 ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
110 RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
111 return Optional.<ActorRef>of(exampleRoleChangeNotifier);
115 protected Optional<ActorRef> getRoleChangeNotifier() {
116 return roleChangeNotifier;
119 @Override protected void applyState(final ActorRef clientActor, final String identifier,
121 if(data instanceof KeyValue){
122 KeyValue kv = (KeyValue) data;
123 state.put(kv.getKey(), kv.getValue());
124 if(clientActor != null) {
125 clientActor.tell(new KeyValueSaved(), getSelf());
130 @Override protected void createSnapshot() {
131 ByteString bs = null;
133 bs = fromObject(state);
134 } catch (Exception e) {
135 LOG.error(e, "Exception in creating snapshot");
137 getSelf().tell(new CaptureSnapshotReply(bs), null);
140 @Override protected void applySnapshot(ByteString snapshot) {
143 state.putAll((HashMap) toObject(snapshot));
144 } catch (Exception e) {
145 LOG.error(e, "Exception in applying snapshot");
147 if(LOG.isDebugEnabled()) {
148 LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
152 private ByteString fromObject(Object snapshot) throws Exception {
153 ByteArrayOutputStream b = null;
154 ObjectOutputStream o = null;
156 b = new ByteArrayOutputStream();
157 o = new ObjectOutputStream(b);
158 o.writeObject(snapshot);
159 byte[] snapshotBytes = b.toByteArray();
160 return ByteString.copyFrom(snapshotBytes);
172 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
174 ByteArrayInputStream bis = null;
175 ObjectInputStream ois = null;
177 bis = new ByteArrayInputStream(bs.toByteArray());
178 ois = new ObjectInputStream(bis);
179 obj = ois.readObject();
191 @Override protected void onStateChanged() {
196 protected DataPersistenceProvider persistence() {
197 return dataPersistenceProvider;
200 @Override public void onReceiveRecover(Object message)throws Exception {
201 super.onReceiveRecover(message);
204 @Override public String persistenceId() {
209 protected void startLogRecoveryBatch(int maxBatchSize) {
213 protected void appendRecoveredLogEntry(Payload data) {
217 protected void applyCurrentLogRecoveryBatch() {
221 protected void onRecoveryComplete() {
225 protected void applyRecoverySnapshot(ByteString snapshot) {