Merge "Dummy Distributed Data Store for testing replication"
[controller.git] / opendaylight / md-sal / sal-dummy-distributed-datastore / src / main / java / org / opendaylight / controller / dummy / datastore / DummyShard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.dummy.datastore;
10
11 import akka.actor.Props;
12 import akka.actor.UntypedActor;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
15 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
16 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
18 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 public class DummyShard extends UntypedActor{
24     private final Configuration configuration;
25     private final String followerId;
26     private final Logger LOG = LoggerFactory.getLogger(DummyShard.class);
27
28     public DummyShard(Configuration configuration, String followerId) {
29         this.configuration = configuration;
30         this.followerId = followerId;
31         LOG.info("Creating : {}", followerId);
32     }
33
34     @Override
35     public void onReceive(Object o) throws Exception {
36         if(o instanceof RequestVote){
37             RequestVote req = (RequestVote) o;
38             sender().tell(new RequestVoteReply(req.getTerm(), true), self());
39         } else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
40             AppendEntries req = AppendEntries.fromSerializable(o);
41             handleAppendEntries(req);
42         } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
43             InstallSnapshot req = InstallSnapshot.fromSerializable(o);
44             handleInstallSnapshot(req);
45         } else if(o instanceof InstallSnapshot){
46             handleInstallSnapshot((InstallSnapshot) o);
47         } else {
48             LOG.error("Unknown message : {}", o.getClass());
49         }
50     }
51
52     private void handleInstallSnapshot(InstallSnapshot req) {
53         sender().tell(new InstallSnapshotReply(req.getTerm(), followerId, req.getChunkIndex(), true), self());
54     }
55
56     protected void handleAppendEntries(AppendEntries req) throws InterruptedException {
57         LOG.info("{} - Received AppendEntries message : leader term, index, size = {}, {}, {}", followerId, req.getTerm(),req.getLeaderCommit(), req.getEntries().size());
58         long lastIndex = req.getLeaderCommit();
59         if (req.getEntries().size() > 0)
60             lastIndex = req.getEntries().get(0).getIndex();
61
62         if (configuration.shouldCauseTrouble()) {
63             boolean ignore = false;
64
65             if (configuration.shouldDropReplies()) {
66                 ignore = Math.random() > 0.5;
67             }
68
69             long delay = (long) (Math.random() * configuration.getMaxDelayInMillis());
70
71             if (!ignore) {
72                 LOG.info("{} - Randomizing delay : {}", followerId, delay);
73                 Thread.sleep(delay);
74                 sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
75             }
76         } else {
77             sender().tell(new AppendEntriesReply(followerId, req.getTerm(), true, lastIndex, req.getTerm()), self());
78         }
79     }
80
81     public static Props props(Configuration configuration, final String followerId) {
82
83         return Props.create(new DummyShardCreator(configuration, followerId));
84     }
85
86     private static class DummyShardCreator implements Creator<DummyShard> {
87
88         private static final long serialVersionUID = 1L;
89         private final Configuration configuration;
90         private final String followerId;
91
92         DummyShardCreator(Configuration configuration, String followerId) {
93             this.configuration = configuration;
94             this.followerId = followerId;
95         }
96
97         @Override
98         public DummyShard create() throws Exception {
99             return new DummyShard(configuration, followerId);
100         }
101     }
102
103 }