2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 * A sample actor showing how the RaftActor is to be extended
38 public class ExampleActor extends RaftActor {
40 private final Map<String, String> state = new HashMap<>();
41 private final DataPersistenceProvider dataPersistenceProvider;
43 private long persistIdentifier = 1;
46 public ExampleActor(final String id, final Map<String, String> peerAddresses,
47 final Optional<ConfigParams> configParams) {
48 super(id, peerAddresses, configParams);
49 this.dataPersistenceProvider = new PersistentDataProvider();
52 public static Props props(final String id, final Map<String, String> peerAddresses,
53 final Optional<ConfigParams> configParams){
54 return Props.create(new Creator<ExampleActor>(){
56 @Override public ExampleActor create() throws Exception {
57 return new ExampleActor(id, peerAddresses, configParams);
62 @Override public void onReceiveCommand(final Object message) throws Exception{
63 if(message instanceof KeyValue){
65 String persistId = Long.toString(persistIdentifier++);
66 persistData(getSender(), persistId, (Payload) message);
68 if(getLeader() != null) {
69 getLeader().forward(message, getContext());
73 } else if (message instanceof PrintState) {
74 if(LOG.isDebugEnabled()) {
75 LOG.debug("State of the node:{} has entries={}, {}",
76 getId(), state.size(), getReplicatedLogState());
79 } else if (message instanceof PrintRole) {
80 if(LOG.isDebugEnabled()) {
81 String followers = "";
82 if (getRaftState() == RaftState.Leader) {
83 followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
84 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
86 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
93 super.onReceiveCommand(message);
97 @Override protected void applyState(final ActorRef clientActor, final String identifier,
99 if(data instanceof KeyValue){
100 KeyValue kv = (KeyValue) data;
101 state.put(kv.getKey(), kv.getValue());
102 if(clientActor != null) {
103 clientActor.tell(new KeyValueSaved(), getSelf());
108 @Override protected void createSnapshot() {
109 ByteString bs = null;
111 bs = fromObject(state);
112 } catch (Exception e) {
113 LOG.error(e, "Exception in creating snapshot");
115 getSelf().tell(new CaptureSnapshotReply(bs), null);
118 @Override protected void applySnapshot(final ByteString snapshot) {
121 state.putAll((Map<String, String>) toObject(snapshot));
122 } catch (Exception e) {
123 LOG.error(e, "Exception in applying snapshot");
125 if(LOG.isDebugEnabled()) {
126 LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
130 private ByteString fromObject(final Object snapshot) throws Exception {
131 ByteArrayOutputStream b = null;
132 ObjectOutputStream o = null;
134 b = new ByteArrayOutputStream();
135 o = new ObjectOutputStream(b);
136 o.writeObject(snapshot);
137 byte[] snapshotBytes = b.toByteArray();
138 return ByteString.copyFrom(snapshotBytes);
150 private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
152 ByteArrayInputStream bis = null;
153 ObjectInputStream ois = null;
155 bis = new ByteArrayInputStream(bs.toByteArray());
156 ois = new ObjectInputStream(bis);
157 obj = ois.readObject();
169 @Override protected void onStateChanged() {
174 protected DataPersistenceProvider persistence() {
175 return dataPersistenceProvider;
178 @Override public void onReceiveRecover(final Object message)throws Exception {
179 super.onReceiveRecover(message);
182 @Override public String persistenceId() {
187 protected void startLogRecoveryBatch(final int maxBatchSize) {
191 protected void appendRecoveredLogEntry(final Payload data) {
195 protected void applyCurrentLogRecoveryBatch() {
199 protected void onRecoveryComplete() {
203 protected void applyRecoverySnapshot(final ByteString snapshot) {