663d400b53b3e3f7a589a08b819f930572d4c753
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Throwables;
15 import com.google.common.io.ByteSource;
16 import java.io.IOException;
17 import java.io.OutputStream;
18 import java.io.Serializable;
19 import java.util.HashMap;
20 import java.util.Map;
21 import javax.annotation.Nonnull;
22 import org.apache.commons.lang3.SerializationUtils;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
36 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
39
40 /**
41  * A sample actor showing how the RaftActor is to be extended
42  */
43 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
44     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
45         private static final long serialVersionUID = 1L;
46
47         PayloadIdentifier(final long identifier) {
48             super(String.valueOf(identifier));
49         }
50     }
51
52     private final Map<String, String> state = new HashMap<>();
53
54     private long persistIdentifier = 1;
55     private final Optional<ActorRef> roleChangeNotifier;
56
57
58     public ExampleActor(String id, Map<String, String> peerAddresses,
59         Optional<ConfigParams> configParams) {
60         super(id, peerAddresses, configParams, (short)0);
61         setPersistence(true);
62         roleChangeNotifier = createRoleChangeNotifier(id);
63     }
64
65     public static Props props(final String id, final Map<String, String> peerAddresses,
66             final Optional<ConfigParams> configParams) {
67         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
68     }
69
70     @Override
71     protected void handleNonRaftCommand(Object message) {
72         if(message instanceof KeyValue){
73             if(isLeader()) {
74                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
75             } else {
76                 if(getLeader() != null) {
77                     getLeader().forward(message, getContext());
78                 }
79             }
80
81         } else if (message instanceof PrintState) {
82             if(LOG.isDebugEnabled()) {
83                 LOG.debug("State of the node:{} has entries={}, {}",
84                     getId(), state.size(), getReplicatedLogState());
85             }
86
87         } else if (message instanceof PrintRole) {
88             if(LOG.isDebugEnabled()) {
89                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
90                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
91                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
92                         getRaftActorContext().getPeerIds(), followers);
93                 } else {
94                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
95                         getRaftActorContext().getPeerIds());
96                 }
97
98
99             }
100
101         } else {
102             super.handleNonRaftCommand(message);
103         }
104     }
105
106     protected String getReplicatedLogState() {
107         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
108             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
109             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
110     }
111
112     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
113         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
114             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
115         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
116     }
117
118     @Override
119     protected Optional<ActorRef> getRoleChangeNotifier() {
120         return roleChangeNotifier;
121     }
122
123     @Override
124     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
125         if(data instanceof KeyValue){
126             KeyValue kv = (KeyValue) data;
127             state.put(kv.getKey(), kv.getValue());
128             if(clientActor != null) {
129                 clientActor.tell(new KeyValueSaved(), getSelf());
130             }
131         }
132     }
133
134     @Override
135     public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
136         try {
137             if (installSnapshotStream.isPresent()) {
138                 SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
139             }
140         } catch (Exception e) {
141             LOG.error("Exception in creating snapshot", e);
142         }
143
144         getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
145     }
146
147     @Override
148     public void applySnapshot(Snapshot.State snapshotState) {
149         state.clear();
150         try {
151             state.putAll(((MapState)snapshotState).state);
152         } catch (Exception e) {
153            LOG.error("Exception in applying snapshot", e);
154         }
155         if(LOG.isDebugEnabled()) {
156             LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
157         }
158     }
159
160     @Override protected void onStateChanged() {
161
162     }
163
164     @Override public String persistenceId() {
165         return getId();
166     }
167
168     @Override
169     @Nonnull
170     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
171         return this;
172     }
173
174     @Override
175     public void startLogRecoveryBatch(int maxBatchSize) {
176     }
177
178     @Override
179     public void appendRecoveredLogEntry(Payload data) {
180     }
181
182     @Override
183     public void applyCurrentLogRecoveryBatch() {
184     }
185
186     @Override
187     public void onRecoveryComplete() {
188     }
189
190     @Override
191     public void applyRecoverySnapshot(Snapshot.State snapshotState) {
192     }
193
194     @Override
195     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
196         return this;
197     }
198
199     @Override
200     public byte[] getRestoreFromSnapshot() {
201         return null;
202     }
203
204     @Override
205     public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
206         try {
207             return deserializePreCarbonSnapshot(snapshotBytes.read());
208         } catch (IOException e) {
209             throw Throwables.propagate(e);
210         }
211     }
212
213     @SuppressWarnings("unchecked")
214     @Override
215     public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
216         return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
217     }
218
219     private static class MapState implements Snapshot.State {
220         private static final long serialVersionUID = 1L;
221
222         Map<String, String> state;
223
224         MapState(Map<String, String> state) {
225             this.state = state;
226         }
227     }
228 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.