59dd29b3142f5c8f80ad81dce17fa859c382eb57
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStoreTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.Status.Success;
17 import akka.pattern.Patterns;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.testkit.JavaTestKit;
20 import akka.util.Timeout;
21 import com.typesafe.config.ConfigFactory;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.After;
29 import org.junit.AfterClass;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
35 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import scala.concurrent.Await;
42 import scala.concurrent.duration.Duration;
43
44 public class BucketStoreTest {
45
46     /**
47      * Dummy class to eliminate rawtype warnings.
48      *
49      * @author gwu
50      *
51      */
52     private static class T implements BucketData<T> {
53         @Override
54         public Optional<ActorRef> getWatchActor() {
55             return Optional.empty();
56         }
57     }
58
59     private static ActorSystem system;
60
61     private JavaTestKit kit;
62
63     @BeforeClass
64     public static void setup() {
65         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
66         system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
67     }
68
69     @AfterClass
70     public static void teardown() {
71         JavaTestKit.shutdownActorSystem(system);
72     }
73
74     @Before
75     public void before() {
76         kit = new JavaTestKit(system);
77     }
78
79     @After
80     public void after() {
81         kit.shutdown(system);
82     }
83
84     /**
85      * Given remote buckets, should merge with local copy of remote buckets.
86      */
87     @Test
88     public void testReceiveUpdateRemoteBuckets() throws Exception {
89
90         final ActorRef store = createStore();
91         Address localAddress = system.provider().getDefaultAddress();
92         Bucket<T> localBucket = new BucketImpl<>(0L, new T());
93
94         Address a1 = new Address("tcp", "system1");
95         Address a2 = new Address("tcp", "system2");
96         Address a3 = new Address("tcp", "system3");
97
98         Bucket<T> b1 = new BucketImpl<>(0L, new T());
99         Bucket<T> b2 = new BucketImpl<>(0L, new T());
100         Bucket<T> b3 = new BucketImpl<>(0L, new T());
101
102         Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
103         remoteBuckets.put(a1, b1);
104         remoteBuckets.put(a2, b2);
105         remoteBuckets.put(a3, b3);
106         remoteBuckets.put(localAddress, localBucket);
107
108         Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
109                 Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
110
111         //Given remote buckets
112         store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
113
114         //Should contain local bucket
115         //Should contain 4 entries i.e a1, a2, a3, local
116         Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
117         Assert.assertTrue(remoteBucketsInStore.size() == 4);
118
119         //Add a new remote bucket
120         Address a4 = new Address("tcp", "system4");
121         Bucket<T> b4 = new BucketImpl<>(0L, new T());
122         remoteBuckets.clear();
123         remoteBuckets.put(a4, b4);
124         store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
125
126         //Should contain a4
127         //Should contain 5 entries now i.e a1, a2, a3, a4, local
128         remoteBucketsInStore = getBuckets(store);
129         Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
130         Assert.assertTrue(remoteBucketsInStore.size() == 5);
131
132         //Update a bucket
133         Bucket<T> b3New = new BucketImpl<>(0L, new T());
134         remoteBuckets.clear();
135         remoteBuckets.put(a3, b3New);
136         remoteBuckets.put(a1, null);
137         remoteBuckets.put(a2, null);
138         store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
139
140         //Should only update a3
141         remoteBucketsInStore = getBuckets(store);
142         Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
143         Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
144
145         //Should NOT update a1 and a2
146         Bucket<T> b1InStore = remoteBucketsInStore.get(a1);
147         Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
148         Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
149         Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
150         Assert.assertTrue(remoteBucketsInStore.size() == 5);
151
152         //Should update versions map
153         //versions map contains versions for all remote buckets (4).
154         Map<Address, Long> versionsInStore = getVersions(store);
155         Assert.assertEquals(4, versionsInStore.size());
156         Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
157         Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
158         Assert.assertEquals((Long)b3New.getVersion(), versionsInStore.get(a3));
159         Assert.assertEquals((Long)b4.getVersion(), versionsInStore.get(a4));
160
161         //Send older version of bucket
162         remoteBuckets.clear();
163         remoteBuckets.put(a3, b3);
164         store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
165
166         //Should NOT update a3
167         remoteBucketsInStore = getBuckets(store);
168         b3InStore = remoteBucketsInStore.get(a3);
169         Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
170     }
171
172     /**
173      * Create BucketStore actor and returns the underlying instance of BucketStore class.
174      *
175      * @return instance of BucketStore class
176      */
177     private ActorRef createStore() {
178         return kit.childActorOf(Props.create(TestingBucketStore.class,
179             new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
180     }
181
182     @SuppressWarnings("unchecked")
183     private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
184         final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
185                 Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
186         return result.getBuckets();
187     }
188
189     @SuppressWarnings("unchecked")
190     private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
191         return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
192             Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
193     }
194
195     private static final class TestingBucketStore extends BucketStore<T> {
196
197         private final List<ActorRef> toNotify = new ArrayList<>();
198
199         TestingBucketStore(final RemoteRpcProviderConfig config,
200                                   final String persistenceId,
201                                   final T initialData) {
202             super(config, persistenceId, initialData);
203         }
204
205         @Override
206         protected void handleCommand(Object message) throws Exception {
207             if (message instanceof WaitUntilDonePersisting) {
208                 handlePersistAsk();
209             } else if (message instanceof SaveSnapshotSuccess) {
210                 super.handleCommand(message);
211                 handleSnapshotSuccess();
212             } else {
213                 super.handleCommand(message);
214             }
215         }
216
217         private void handlePersistAsk() {
218             if (isPersisting()) {
219                 toNotify.add(getSender());
220             } else {
221                 getSender().tell(new Success(null), noSender());
222             }
223         }
224
225         private void handleSnapshotSuccess() {
226             toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
227         }
228     }
229
230     /**
231      * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
232      */
233     private static final class WaitUntilDonePersisting {
234
235     }
236 }