ff57bfd1d5ae21db8f8ae1646c9a8f6e1f08b3a5
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 import org.opendaylight.yangtools.concepts.Identifier;
37 import org.opendaylight.yangtools.util.StringIdentifier;
38
39 /**
40  * A sample actor showing how the RaftActor is to be extended
41  */
42 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
43
44     private final Map<String, String> state = new HashMap<>();
45
46     private long persistIdentifier = 1;
47     private final Optional<ActorRef> roleChangeNotifier;
48
49
50     public ExampleActor(String id, Map<String, String> peerAddresses,
51         Optional<ConfigParams> configParams) {
52         super(id, peerAddresses, configParams, (short)0);
53         setPersistence(true);
54         roleChangeNotifier = createRoleChangeNotifier(id);
55     }
56
57     public static Props props(final String id, final Map<String, String> peerAddresses,
58             final Optional<ConfigParams> configParams) {
59         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
60     }
61
62     @Override
63     protected void handleNonRaftCommand(Object message) {
64         if(message instanceof KeyValue){
65             if(isLeader()) {
66                 persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message);
67             } else {
68                 if(getLeader() != null) {
69                     getLeader().forward(message, getContext());
70                 }
71             }
72
73         } else if (message instanceof PrintState) {
74             if(LOG.isDebugEnabled()) {
75                 LOG.debug("State of the node:{} has entries={}, {}",
76                     getId(), state.size(), getReplicatedLogState());
77             }
78
79         } else if (message instanceof PrintRole) {
80             if(LOG.isDebugEnabled()) {
81                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
82                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
83                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
84                         getRaftActorContext().getPeerIds(), followers);
85                 } else {
86                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
87                         getRaftActorContext().getPeerIds());
88                 }
89
90
91             }
92
93         } else {
94             super.handleNonRaftCommand(message);
95         }
96     }
97
98     protected String getReplicatedLogState() {
99         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
100             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
101             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
102     }
103
104     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
105         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
106             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
107         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
108     }
109
110     @Override
111     protected Optional<ActorRef> getRoleChangeNotifier() {
112         return roleChangeNotifier;
113     }
114
115     @Override
116     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
117         if(data instanceof KeyValue){
118             KeyValue kv = (KeyValue) data;
119             state.put(kv.getKey(), kv.getValue());
120             if(clientActor != null) {
121                 clientActor.tell(new KeyValueSaved(), getSelf());
122             }
123         }
124     }
125
126     @Override
127     public void createSnapshot(ActorRef actorRef) {
128         ByteString bs = null;
129         try {
130             bs = fromObject(state);
131         } catch (Exception e) {
132             LOG.error("Exception in creating snapshot", e);
133         }
134         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
135     }
136
137     @Override
138     public void applySnapshot(byte [] snapshot) {
139         state.clear();
140         try {
141             state.putAll((HashMap<String, String>) toObject(snapshot));
142         } catch (Exception e) {
143            LOG.error("Exception in applying snapshot", e);
144         }
145         if(LOG.isDebugEnabled()) {
146             LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
147         }
148     }
149
150     private static ByteString fromObject(Object snapshot) throws Exception {
151         ByteArrayOutputStream b = null;
152         ObjectOutputStream o = null;
153         try {
154             b = new ByteArrayOutputStream();
155             o = new ObjectOutputStream(b);
156             o.writeObject(snapshot);
157             byte[] snapshotBytes = b.toByteArray();
158             return ByteString.copyFrom(snapshotBytes);
159         } finally {
160             if (o != null) {
161                 o.flush();
162                 o.close();
163             }
164             if (b != null) {
165                 b.close();
166             }
167         }
168     }
169
170     private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
171         Object obj = null;
172         ByteArrayInputStream bis = null;
173         ObjectInputStream ois = null;
174         try {
175             bis = new ByteArrayInputStream(bs);
176             ois = new ObjectInputStream(bis);
177             obj = ois.readObject();
178         } finally {
179             if (bis != null) {
180                 bis.close();
181             }
182             if (ois != null) {
183                 ois.close();
184             }
185         }
186         return obj;
187     }
188
189     @Override protected void onStateChanged() {
190
191     }
192
193     @Override public String persistenceId() {
194         return getId();
195     }
196
197     @Override
198     @Nonnull
199     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
200         return this;
201     }
202
203     @Override
204     public void startLogRecoveryBatch(int maxBatchSize) {
205     }
206
207     @Override
208     public void appendRecoveredLogEntry(Payload data) {
209     }
210
211     @Override
212     public void applyCurrentLogRecoveryBatch() {
213     }
214
215     @Override
216     public void onRecoveryComplete() {
217     }
218
219     @Override
220     public void applyRecoverySnapshot(byte[] snapshot) {
221     }
222
223     @Override
224     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
225         return this;
226     }
227
228     @Override
229     public byte[] getRestoreFromSnapshot() {
230         return null;
231     }
232 }