Remove use of {String,UUID}Identifier
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 import org.opendaylight.yangtools.concepts.Identifier;
37 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
38
39 /**
40  * A sample actor showing how the RaftActor is to be extended
41  */
42 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
43     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
44         private static final long serialVersionUID = 1L;
45
46         PayloadIdentifier(final long identifier) {
47             super(String.valueOf(identifier));
48         }
49     }
50
51     private final Map<String, String> state = new HashMap<>();
52
53     private long persistIdentifier = 1;
54     private final Optional<ActorRef> roleChangeNotifier;
55
56
57     public ExampleActor(String id, Map<String, String> peerAddresses,
58         Optional<ConfigParams> configParams) {
59         super(id, peerAddresses, configParams, (short)0);
60         setPersistence(true);
61         roleChangeNotifier = createRoleChangeNotifier(id);
62     }
63
64     public static Props props(final String id, final Map<String, String> peerAddresses,
65             final Optional<ConfigParams> configParams) {
66         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
67     }
68
69     @Override
70     protected void handleNonRaftCommand(Object message) {
71         if(message instanceof KeyValue){
72             if(isLeader()) {
73                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message);
74             } else {
75                 if(getLeader() != null) {
76                     getLeader().forward(message, getContext());
77                 }
78             }
79
80         } else if (message instanceof PrintState) {
81             if(LOG.isDebugEnabled()) {
82                 LOG.debug("State of the node:{} has entries={}, {}",
83                     getId(), state.size(), getReplicatedLogState());
84             }
85
86         } else if (message instanceof PrintRole) {
87             if(LOG.isDebugEnabled()) {
88                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
89                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
90                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
91                         getRaftActorContext().getPeerIds(), followers);
92                 } else {
93                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
94                         getRaftActorContext().getPeerIds());
95                 }
96
97
98             }
99
100         } else {
101             super.handleNonRaftCommand(message);
102         }
103     }
104
105     protected String getReplicatedLogState() {
106         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
107             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
108             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
109     }
110
111     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
112         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
113             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
114         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
115     }
116
117     @Override
118     protected Optional<ActorRef> getRoleChangeNotifier() {
119         return roleChangeNotifier;
120     }
121
122     @Override
123     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
124         if(data instanceof KeyValue){
125             KeyValue kv = (KeyValue) data;
126             state.put(kv.getKey(), kv.getValue());
127             if(clientActor != null) {
128                 clientActor.tell(new KeyValueSaved(), getSelf());
129             }
130         }
131     }
132
133     @Override
134     public void createSnapshot(ActorRef actorRef) {
135         ByteString bs = null;
136         try {
137             bs = fromObject(state);
138         } catch (Exception e) {
139             LOG.error("Exception in creating snapshot", e);
140         }
141         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
142     }
143
144     @Override
145     public void applySnapshot(byte [] snapshot) {
146         state.clear();
147         try {
148             state.putAll((HashMap<String, String>) toObject(snapshot));
149         } catch (Exception e) {
150            LOG.error("Exception in applying snapshot", e);
151         }
152         if(LOG.isDebugEnabled()) {
153             LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
154         }
155     }
156
157     private static ByteString fromObject(Object snapshot) throws Exception {
158         ByteArrayOutputStream b = null;
159         ObjectOutputStream o = null;
160         try {
161             b = new ByteArrayOutputStream();
162             o = new ObjectOutputStream(b);
163             o.writeObject(snapshot);
164             byte[] snapshotBytes = b.toByteArray();
165             return ByteString.copyFrom(snapshotBytes);
166         } finally {
167             if (o != null) {
168                 o.flush();
169                 o.close();
170             }
171             if (b != null) {
172                 b.close();
173             }
174         }
175     }
176
177     private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
178         Object obj = null;
179         ByteArrayInputStream bis = null;
180         ObjectInputStream ois = null;
181         try {
182             bis = new ByteArrayInputStream(bs);
183             ois = new ObjectInputStream(bis);
184             obj = ois.readObject();
185         } finally {
186             if (bis != null) {
187                 bis.close();
188             }
189             if (ois != null) {
190                 ois.close();
191             }
192         }
193         return obj;
194     }
195
196     @Override protected void onStateChanged() {
197
198     }
199
200     @Override public String persistenceId() {
201         return getId();
202     }
203
204     @Override
205     @Nonnull
206     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
207         return this;
208     }
209
210     @Override
211     public void startLogRecoveryBatch(int maxBatchSize) {
212     }
213
214     @Override
215     public void appendRecoveredLogEntry(Payload data) {
216     }
217
218     @Override
219     public void applyCurrentLogRecoveryBatch() {
220     }
221
222     @Override
223     public void onRecoveryComplete() {
224     }
225
226     @Override
227     public void applyRecoverySnapshot(byte[] snapshot) {
228     }
229
230     @Override
231     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
232         return this;
233     }
234
235     @Override
236     public byte[] getRestoreFromSnapshot() {
237         return null;
238     }
239 }