Update lispflowmapping options in custom.properties
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import org.opendaylight.controller.cluster.example.messages.KeyValue;
23 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
24 import org.opendaylight.controller.cluster.example.messages.PrintRole;
25 import org.opendaylight.controller.cluster.example.messages.PrintState;
26 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
27 import org.opendaylight.controller.cluster.raft.ConfigParams;
28 import org.opendaylight.controller.cluster.raft.RaftActor;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33
34 /**
35  * A sample actor showing how the RaftActor is to be extended
36  */
37 public class ExampleActor extends RaftActor {
38
39     private final Map<String, String> state = new HashMap();
40
41     private long persistIdentifier = 1;
42     private final Optional<ActorRef> roleChangeNotifier;
43
44
45     public ExampleActor(String id, Map<String, String> peerAddresses,
46         Optional<ConfigParams> configParams) {
47         super(id, peerAddresses, configParams);
48         setPersistence(true);
49         roleChangeNotifier = createRoleChangeNotifier(id);
50     }
51
52     public static Props props(final String id, final Map<String, String> peerAddresses,
53             final Optional<ConfigParams> configParams) {
54         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
55     }
56
57     @Override public void onReceiveCommand(Object message) throws Exception{
58         if(message instanceof KeyValue){
59             if(isLeader()) {
60                 String persistId = Long.toString(persistIdentifier++);
61                 persistData(getSender(), persistId, (Payload) message);
62             } else {
63                 if(getLeader() != null) {
64                     getLeader().forward(message, getContext());
65                 }
66             }
67
68         } else if (message instanceof PrintState) {
69             if(LOG.isDebugEnabled()) {
70                 LOG.debug("State of the node:{} has entries={}, {}",
71                     getId(), state.size(), getReplicatedLogState());
72             }
73
74         } else if (message instanceof PrintRole) {
75             if(LOG.isDebugEnabled()) {
76                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
77                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
78                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
79                         getRaftActorContext().getPeerAddresses().keySet(), followers);
80                 } else {
81                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
82                         getRaftActorContext().getPeerAddresses().keySet());
83                 }
84
85
86             }
87
88         } else {
89             super.onReceiveCommand(message);
90         }
91     }
92
93     protected String getReplicatedLogState() {
94         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
95             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
96             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
97     }
98
99     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
100         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
101             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
102         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
103     }
104
105     @Override
106     protected Optional<ActorRef> getRoleChangeNotifier() {
107         return roleChangeNotifier;
108     }
109
110     @Override protected void applyState(final ActorRef clientActor, final String identifier,
111         final Object data) {
112         if(data instanceof KeyValue){
113             KeyValue kv = (KeyValue) data;
114             state.put(kv.getKey(), kv.getValue());
115             if(clientActor != null) {
116                 clientActor.tell(new KeyValueSaved(), getSelf());
117             }
118         }
119     }
120
121     @Override protected void createSnapshot() {
122         ByteString bs = null;
123         try {
124             bs = fromObject(state);
125         } catch (Exception e) {
126             LOG.error("Exception in creating snapshot", e);
127         }
128         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
129     }
130
131     @Override protected void applySnapshot(byte [] snapshot) {
132         state.clear();
133         try {
134             state.putAll((HashMap) toObject(snapshot));
135         } catch (Exception e) {
136            LOG.error("Exception in applying snapshot", e);
137         }
138         if(LOG.isDebugEnabled()) {
139             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
140         }
141     }
142
143     private ByteString fromObject(Object snapshot) throws Exception {
144         ByteArrayOutputStream b = null;
145         ObjectOutputStream o = null;
146         try {
147             b = new ByteArrayOutputStream();
148             o = new ObjectOutputStream(b);
149             o.writeObject(snapshot);
150             byte[] snapshotBytes = b.toByteArray();
151             return ByteString.copyFrom(snapshotBytes);
152         } finally {
153             if (o != null) {
154                 o.flush();
155                 o.close();
156             }
157             if (b != null) {
158                 b.close();
159             }
160         }
161     }
162
163     private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
164         Object obj = null;
165         ByteArrayInputStream bis = null;
166         ObjectInputStream ois = null;
167         try {
168             bis = new ByteArrayInputStream(bs);
169             ois = new ObjectInputStream(bis);
170             obj = ois.readObject();
171         } finally {
172             if (bis != null) {
173                 bis.close();
174             }
175             if (ois != null) {
176                 ois.close();
177             }
178         }
179         return obj;
180     }
181
182     @Override protected void onStateChanged() {
183
184     }
185
186     @Override public void onReceiveRecover(Object message)throws Exception {
187         super.onReceiveRecover(message);
188     }
189
190     @Override public String persistenceId() {
191         return getId();
192     }
193
194     @Override
195     protected void startLogRecoveryBatch(int maxBatchSize) {
196     }
197
198     @Override
199     protected void appendRecoveredLogEntry(Payload data) {
200     }
201
202     @Override
203     protected void applyCurrentLogRecoveryBatch() {
204     }
205
206     @Override
207     protected void onRecoveryComplete() {
208     }
209
210     @Override
211     protected void applyRecoverySnapshot(byte[] snapshot) {
212     }
213 }