Merge "Bug-2277 : Isolated Leader Implementation"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
22 import java.util.Map;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
34
35 /**
36  * A sample actor showing how the RaftActor is to be extended
37  */
38 public class ExampleActor extends RaftActor {
39
40     private final Map<String, String> state = new HashMap<>();
41     private final DataPersistenceProvider dataPersistenceProvider;
42
43     private long persistIdentifier = 1;
44
45
46     public ExampleActor(final String id, final Map<String, String> peerAddresses,
47         final Optional<ConfigParams> configParams) {
48         super(id, peerAddresses, configParams);
49         this.dataPersistenceProvider = new PersistentDataProvider();
50     }
51
52     public static Props props(final String id, final Map<String, String> peerAddresses,
53         final Optional<ConfigParams> configParams){
54         return Props.create(new Creator<ExampleActor>(){
55             private static final long serialVersionUID = 1L;
56
57             @Override public ExampleActor create() throws Exception {
58                 return new ExampleActor(id, peerAddresses, configParams);
59             }
60         });
61     }
62
63     @Override public void onReceiveCommand(final Object message) throws Exception{
64         if(message instanceof KeyValue){
65             if(isLeader()) {
66                 String persistId = Long.toString(persistIdentifier++);
67                 persistData(getSender(), persistId, (Payload) message);
68             } else {
69                 if(getLeader() != null) {
70                     getLeader().forward(message, getContext());
71                 }
72             }
73
74         } else if (message instanceof PrintState) {
75             if(LOG.isDebugEnabled()) {
76                 LOG.debug("State of the node:{} has entries={}, {}",
77                     getId(), state.size(), getReplicatedLogState());
78             }
79
80         } else if (message instanceof PrintRole) {
81             if(LOG.isDebugEnabled()) {
82                 String followers = "";
83                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
84                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
85                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
86                 } else {
87                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
88                 }
89
90
91             }
92
93         } else {
94             super.onReceiveCommand(message);
95         }
96     }
97
98     @Override protected void applyState(final ActorRef clientActor, final String identifier,
99         final Object data) {
100         if(data instanceof KeyValue){
101             KeyValue kv = (KeyValue) data;
102             state.put(kv.getKey(), kv.getValue());
103             if(clientActor != null) {
104                 clientActor.tell(new KeyValueSaved(), getSelf());
105             }
106         }
107     }
108
109     @Override protected void createSnapshot() {
110         ByteString bs = null;
111         try {
112             bs = fromObject(state);
113         } catch (Exception e) {
114             LOG.error(e, "Exception in creating snapshot");
115         }
116         getSelf().tell(new CaptureSnapshotReply(bs), null);
117     }
118
119     @Override protected void applySnapshot(final ByteString snapshot) {
120         state.clear();
121         try {
122             state.putAll((Map<String, String>) toObject(snapshot));
123         } catch (Exception e) {
124            LOG.error(e, "Exception in applying snapshot");
125         }
126         if(LOG.isDebugEnabled()) {
127             LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
128         }
129     }
130
131     private ByteString fromObject(final Object snapshot) throws Exception {
132         ByteArrayOutputStream b = null;
133         ObjectOutputStream o = null;
134         try {
135             b = new ByteArrayOutputStream();
136             o = new ObjectOutputStream(b);
137             o.writeObject(snapshot);
138             byte[] snapshotBytes = b.toByteArray();
139             return ByteString.copyFrom(snapshotBytes);
140         } finally {
141             if (o != null) {
142                 o.flush();
143                 o.close();
144             }
145             if (b != null) {
146                 b.close();
147             }
148         }
149     }
150
151     private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
152         Object obj = null;
153         ByteArrayInputStream bis = null;
154         ObjectInputStream ois = null;
155         try {
156             bis = new ByteArrayInputStream(bs.toByteArray());
157             ois = new ObjectInputStream(bis);
158             obj = ois.readObject();
159         } finally {
160             if (bis != null) {
161                 bis.close();
162             }
163             if (ois != null) {
164                 ois.close();
165             }
166         }
167         return obj;
168     }
169
170     @Override protected void onStateChanged() {
171
172     }
173
174     @Override
175     protected DataPersistenceProvider persistence() {
176         return dataPersistenceProvider;
177     }
178
179     @Override public void onReceiveRecover(final Object message)throws Exception {
180         super.onReceiveRecover(message);
181     }
182
183     @Override public String persistenceId() {
184         return getId();
185     }
186
187     @Override
188     protected void startLogRecoveryBatch(final int maxBatchSize) {
189     }
190
191     @Override
192     protected void appendRecoveredLogEntry(final Payload data) {
193     }
194
195     @Override
196     protected void applyCurrentLogRecoveryBatch() {
197     }
198
199     @Override
200     protected void onRecoveryComplete() {
201     }
202
203     @Override
204     protected void applyRecoverySnapshot(final ByteString snapshot) {
205     }
206 }