Cleanup: Remove passing around of DataPersistenceProvider
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36
37 /**
38  * A sample actor showing how the RaftActor is to be extended
39  */
40 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
41
42     private final Map<String, String> state = new HashMap<>();
43
44     private long persistIdentifier = 1;
45     private final Optional<ActorRef> roleChangeNotifier;
46
47
48     public ExampleActor(String id, Map<String, String> peerAddresses,
49         Optional<ConfigParams> configParams) {
50         super(id, peerAddresses, configParams);
51         setPersistence(true);
52         roleChangeNotifier = createRoleChangeNotifier(id);
53     }
54
55     public static Props props(final String id, final Map<String, String> peerAddresses,
56             final Optional<ConfigParams> configParams) {
57         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
58     }
59
60     @Override public void onReceiveCommand(Object message) throws Exception{
61         if(message instanceof KeyValue){
62             if(isLeader()) {
63                 String persistId = Long.toString(persistIdentifier++);
64                 persistData(getSender(), persistId, (Payload) message);
65             } else {
66                 if(getLeader() != null) {
67                     getLeader().forward(message, getContext());
68                 }
69             }
70
71         } else if (message instanceof PrintState) {
72             if(LOG.isDebugEnabled()) {
73                 LOG.debug("State of the node:{} has entries={}, {}",
74                     getId(), state.size(), getReplicatedLogState());
75             }
76
77         } else if (message instanceof PrintRole) {
78             if(LOG.isDebugEnabled()) {
79                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
80                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
81                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
82                         getRaftActorContext().getPeerAddresses().keySet(), followers);
83                 } else {
84                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
85                         getRaftActorContext().getPeerAddresses().keySet());
86                 }
87
88
89             }
90
91         } else {
92             super.onReceiveCommand(message);
93         }
94     }
95
96     protected String getReplicatedLogState() {
97         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
98             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
99             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
100     }
101
102     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
103         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
104             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
105         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
106     }
107
108     @Override
109     protected Optional<ActorRef> getRoleChangeNotifier() {
110         return roleChangeNotifier;
111     }
112
113     @Override protected void applyState(final ActorRef clientActor, final String identifier,
114         final Object data) {
115         if(data instanceof KeyValue){
116             KeyValue kv = (KeyValue) data;
117             state.put(kv.getKey(), kv.getValue());
118             if(clientActor != null) {
119                 clientActor.tell(new KeyValueSaved(), getSelf());
120             }
121         }
122     }
123
124     @Override
125     public void createSnapshot(ActorRef actorRef) {
126         ByteString bs = null;
127         try {
128             bs = fromObject(state);
129         } catch (Exception e) {
130             LOG.error("Exception in creating snapshot", e);
131         }
132         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
133     }
134
135     @Override
136     public void applySnapshot(byte [] snapshot) {
137         state.clear();
138         try {
139             state.putAll((HashMap<String, String>) toObject(snapshot));
140         } catch (Exception e) {
141            LOG.error("Exception in applying snapshot", e);
142         }
143         if(LOG.isDebugEnabled()) {
144             LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
145         }
146     }
147
148     private ByteString fromObject(Object snapshot) throws Exception {
149         ByteArrayOutputStream b = null;
150         ObjectOutputStream o = null;
151         try {
152             b = new ByteArrayOutputStream();
153             o = new ObjectOutputStream(b);
154             o.writeObject(snapshot);
155             byte[] snapshotBytes = b.toByteArray();
156             return ByteString.copyFrom(snapshotBytes);
157         } finally {
158             if (o != null) {
159                 o.flush();
160                 o.close();
161             }
162             if (b != null) {
163                 b.close();
164             }
165         }
166     }
167
168     private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
169         Object obj = null;
170         ByteArrayInputStream bis = null;
171         ObjectInputStream ois = null;
172         try {
173             bis = new ByteArrayInputStream(bs);
174             ois = new ObjectInputStream(bis);
175             obj = ois.readObject();
176         } finally {
177             if (bis != null) {
178                 bis.close();
179             }
180             if (ois != null) {
181                 ois.close();
182             }
183         }
184         return obj;
185     }
186
187     @Override protected void onStateChanged() {
188
189     }
190
191     @Override public void onReceiveRecover(Object message)throws Exception {
192         super.onReceiveRecover(message);
193     }
194
195     @Override public String persistenceId() {
196         return getId();
197     }
198
199     @Override
200     @Nonnull
201     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
202         return this;
203     }
204
205     @Override
206     public void startLogRecoveryBatch(int maxBatchSize) {
207     }
208
209     @Override
210     public void appendRecoveredLogEntry(Payload data) {
211     }
212
213     @Override
214     public void applyCurrentLogRecoveryBatch() {
215     }
216
217     @Override
218     public void onRecoveryComplete() {
219     }
220
221     @Override
222     public void applyRecoverySnapshot(byte[] snapshot) {
223     }
224
225     @Override
226     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
227         return this;
228     }
229 }