Merge "BUG-2184 Fix config.yang module(add type as a key for modules list)"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14
15 import com.google.common.base.Optional;
16 import com.google.protobuf.ByteString;
17
18 import org.opendaylight.controller.cluster.example.messages.KeyValue;
19 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
20 import org.opendaylight.controller.cluster.example.messages.PrintRole;
21 import org.opendaylight.controller.cluster.example.messages.PrintState;
22 import org.opendaylight.controller.cluster.raft.ConfigParams;
23 import org.opendaylight.controller.cluster.raft.RaftActor;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
25 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
26
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.ObjectInputStream;
31 import java.io.ObjectOutputStream;
32 import java.util.HashMap;
33 import java.util.Map;
34
35 /**
36  * A sample actor showing how the RaftActor is to be extended
37  */
38 public class ExampleActor extends RaftActor {
39
40     private final Map<String, String> state = new HashMap();
41
42     private long persistIdentifier = 1;
43
44
45     public ExampleActor(String id, Map<String, String> peerAddresses,
46         Optional<ConfigParams> configParams) {
47         super(id, peerAddresses, configParams);
48     }
49
50     public static Props props(final String id, final Map<String, String> peerAddresses,
51         final Optional<ConfigParams> configParams){
52         return Props.create(new Creator<ExampleActor>(){
53
54             @Override public ExampleActor create() throws Exception {
55                 return new ExampleActor(id, peerAddresses, configParams);
56             }
57         });
58     }
59
60     @Override public void onReceiveCommand(Object message){
61         if(message instanceof KeyValue){
62             if(isLeader()) {
63                 String persistId = Long.toString(persistIdentifier++);
64                 persistData(getSender(), persistId, (Payload) message);
65             } else {
66                 if(getLeader() != null) {
67                     getLeader().forward(message, getContext());
68                 }
69             }
70
71         } else if (message instanceof PrintState) {
72             if(LOG.isDebugEnabled()) {
73                 LOG.debug("State of the node:{} has entries={}, {}",
74                     getId(), state.size(), getReplicatedLogState());
75             }
76
77         } else if (message instanceof PrintRole) {
78             if(LOG.isDebugEnabled()) {
79                 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
80             }
81
82         } else {
83             super.onReceiveCommand(message);
84         }
85     }
86
87     @Override protected void applyState(ActorRef clientActor, String identifier,
88         Object data) {
89         if(data instanceof KeyValue){
90             KeyValue kv = (KeyValue) data;
91             state.put(kv.getKey(), kv.getValue());
92             if(clientActor != null) {
93                 clientActor.tell(new KeyValueSaved(), getSelf());
94             }
95         }
96     }
97
98     @Override protected void createSnapshot() {
99         ByteString bs = null;
100         try {
101             bs = fromObject(state);
102         } catch (Exception e) {
103             LOG.error(e, "Exception in creating snapshot");
104         }
105         getSelf().tell(new CaptureSnapshotReply(bs), null);
106     }
107
108     @Override protected void applySnapshot(ByteString snapshot) {
109         state.clear();
110         try {
111             state.putAll((HashMap) toObject(snapshot));
112         } catch (Exception e) {
113            LOG.error(e, "Exception in applying snapshot");
114         }
115         if(LOG.isDebugEnabled()) {
116             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
117         }
118     }
119
120     private ByteString fromObject(Object snapshot) throws Exception {
121         ByteArrayOutputStream b = null;
122         ObjectOutputStream o = null;
123         try {
124             b = new ByteArrayOutputStream();
125             o = new ObjectOutputStream(b);
126             o.writeObject(snapshot);
127             byte[] snapshotBytes = b.toByteArray();
128             return ByteString.copyFrom(snapshotBytes);
129         } finally {
130             if (o != null) {
131                 o.flush();
132                 o.close();
133             }
134             if (b != null) {
135                 b.close();
136             }
137         }
138     }
139
140     private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
141         Object obj = null;
142         ByteArrayInputStream bis = null;
143         ObjectInputStream ois = null;
144         try {
145             bis = new ByteArrayInputStream(bs.toByteArray());
146             ois = new ObjectInputStream(bis);
147             obj = ois.readObject();
148         } finally {
149             if (bis != null) {
150                 bis.close();
151             }
152             if (ois != null) {
153                 ois.close();
154             }
155         }
156         return obj;
157     }
158
159     @Override protected void onStateChanged() {
160
161     }
162
163     @Override public void onReceiveRecover(Object message) {
164         super.onReceiveRecover(message);
165     }
166
167     @Override public String persistenceId() {
168         return getId();
169     }
170
171     @Override
172     protected void startLogRecoveryBatch(int maxBatchSize) {
173     }
174
175     @Override
176     protected void appendRecoveredLogEntry(Payload data) {
177     }
178
179     @Override
180     protected void applyCurrentLogRecoveryBatch() {
181     }
182
183     @Override
184     protected void onRecoveryComplete() {
185     }
186
187     @Override
188     protected void applyRecoverySnapshot(ByteString snapshot) {
189     }
190 }