5d2d17122516ae5bb3a108951aed7331d99c8cdf
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.example;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import com.google.common.base.Throwables;
13 import com.google.common.io.ByteSource;
14 import java.io.IOException;
15 import java.io.OutputStream;
16 import java.io.Serializable;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.Optional;
20 import org.apache.commons.lang3.SerializationUtils;
21 import org.opendaylight.controller.cluster.example.messages.KeyValue;
22 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
23 import org.opendaylight.controller.cluster.example.messages.PrintRole;
24 import org.opendaylight.controller.cluster.example.messages.PrintState;
25 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
26 import org.opendaylight.controller.cluster.raft.ConfigParams;
27 import org.opendaylight.controller.cluster.raft.RaftActor;
28 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
29 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35 import org.opendaylight.yangtools.concepts.Identifier;
36 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
37
38 /**
39  * A sample actor showing how the RaftActor is to be extended.
40  */
41 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
42     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
43         private static final long serialVersionUID = 1L;
44
45         PayloadIdentifier(final long identifier) {
46             super(String.valueOf(identifier));
47         }
48     }
49
50     private final Map<String, String> state = new HashMap<>();
51     private final Optional<ActorRef> roleChangeNotifier;
52
53     private long persistIdentifier = 1;
54
55     public ExampleActor(final String id, final Map<String, String> peerAddresses,
56         final Optional<ConfigParams> configParams) {
57         super(id, peerAddresses, configParams, (short)0);
58         setPersistence(true);
59         roleChangeNotifier = createRoleChangeNotifier(id);
60     }
61
62     public static Props props(final String id, final Map<String, String> peerAddresses,
63             final Optional<ConfigParams> configParams) {
64         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
65     }
66
67     @Override
68     protected void handleNonRaftCommand(final Object message) {
69         if (message instanceof KeyValue) {
70             if (isLeader()) {
71                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
72             } else {
73                 if (getLeader() != null) {
74                     getLeader().forward(message, getContext());
75                 }
76             }
77
78         } else if (message instanceof PrintState) {
79             if (LOG.isDebugEnabled()) {
80                 LOG.debug("State of the node:{} has entries={}, {}",
81                     getId(), state.size(), getReplicatedLogState());
82             }
83
84         } else if (message instanceof PrintRole) {
85             if (LOG.isDebugEnabled()) {
86                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
87                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
88                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
89                         getRaftActorContext().getPeerIds(), followers);
90                 } else {
91                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
92                         getRaftActorContext().getPeerIds());
93                 }
94
95
96             }
97
98         } else {
99             super.handleNonRaftCommand(message);
100         }
101     }
102
103     protected String getReplicatedLogState() {
104         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
105             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
106             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
107     }
108
109     public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
110         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
111             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
112         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
113     }
114
115     @Override
116     protected Optional<ActorRef> getRoleChangeNotifier() {
117         return roleChangeNotifier;
118     }
119
120     @Override
121     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
122         if (data instanceof KeyValue) {
123             KeyValue kv = (KeyValue) data;
124             state.put(kv.getKey(), kv.getValue());
125             if (clientActor != null) {
126                 clientActor.tell(new KeyValueSaved(), getSelf());
127             }
128         }
129     }
130
131     @Override
132     @SuppressWarnings("checkstyle:IllegalCatch")
133     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
134         try {
135             if (installSnapshotStream.isPresent()) {
136                 SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
137             }
138         } catch (RuntimeException e) {
139             LOG.error("Exception in creating snapshot", e);
140         }
141
142         getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
143     }
144
145     @Override
146     public void applySnapshot(final Snapshot.State snapshotState) {
147         state.clear();
148         state.putAll(((MapState)snapshotState).state);
149
150         if (LOG.isDebugEnabled()) {
151             LOG.debug("Snapshot applied to state : {}", state.size());
152         }
153     }
154
155     @Override
156     protected void onStateChanged() {
157
158     }
159
160     @Override
161     public String persistenceId() {
162         return getId();
163     }
164
165     @Override
166     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
167         return this;
168     }
169
170     @Override
171     public void startLogRecoveryBatch(final int maxBatchSize) {
172     }
173
174     @Override
175     public void appendRecoveredLogEntry(final Payload data) {
176     }
177
178     @Override
179     public void applyCurrentLogRecoveryBatch() {
180     }
181
182     @Override
183     public void onRecoveryComplete() {
184     }
185
186     @Override
187     public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
188     }
189
190     @Override
191     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
192         return this;
193     }
194
195     @Override
196     public Snapshot getRestoreFromSnapshot() {
197         return null;
198     }
199
200     @SuppressWarnings("unchecked")
201     @Override
202     public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
203         try {
204             return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
205         } catch (IOException e) {
206             throw Throwables.propagate(e);
207         }
208     }
209
210     private static class MapState implements Snapshot.State {
211         private static final long serialVersionUID = 1L;
212
213         Map<String, String> state;
214
215         MapState(final Map<String, String> state) {
216             this.state = state;
217         }
218     }
219 }