2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.dummy.datastore;
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
16 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 public class DummyShard extends UntypedActor{
24 private final Configuration configuration;
25 private final String followerId;
26 private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
28 public DummyShard(Configuration configuration, String followerId) {
29 this.configuration = configuration;
30 this.followerId = followerId;
31 LOG.info("Creating : {}", followerId);
35 public void onReceive(Object o) throws Exception {
36 if(o instanceof RequestVote){
37 RequestVote req = (RequestVote) o;
38 sender().tell(new RequestVoteReply(req.getTerm(), true), self());
39 } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
40 AppendEntries req = AppendEntries.fromSerializable(o);
41 handleAppendEntries(req);
42 } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
43 InstallSnapshot req = InstallSnapshot.fromSerializable(o);
44 handleInstallSnapshot(req);
45 } else if(o instanceof InstallSnapshot){
46 handleInstallSnapshot((InstallSnapshot) o);
48 LOG.error("Unknown message : {}", o.getClass());
52 private void handleInstallSnapshot(InstallSnapshot req) {
53 sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
56 protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
57 LOG.info("{} - Received AppendEntries message : leader term, index, size = {}, {}, {}", followerId, req.getTerm(),req.getLeaderCommit(), req.getEntries().size());
58 long lastIndex = req.getLeaderCommit();
59 if (req.getEntries().size() > 0)
60 lastIndex = req.getEntries().get(0).getIndex();
62 if (configuration.shouldCauseTrouble()) {
63 boolean ignore = false;
65 if (configuration.shouldDropReplies()) {
66 ignore = Math.random() > 0.5;
69 long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
72 LOG.info("{} - Randomizing delay : {}", followerId, delay);
74 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
77 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
81 public static Props props(Configuration configuration, final String followerId) {
83 return Props.create(new DummyShardCreator(configuration, followerId));
86 private static class DummyShardCreator implements Creator<DummyShard> {
88 private static final long serialVersionUID = 1L;
89 private final Configuration configuration;
90 private final String followerId;
92 DummyShardCreator(Configuration configuration, String followerId) {
93 this.configuration = configuration;
94 this.followerId = followerId;
98 public DummyShard create() throws Exception {
99 return new DummyShard(configuration, followerId);