2b9b9d635e4f4e0861d71ba00c833efff002ad44
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
56
57 public class ActorContextTest extends AbstractActorTest{
58
59     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
60
61     private static class TestMessage {
62     }
63
64     private static class MockShardManager extends UntypedActor {
65
66         private final boolean found;
67         private final ActorRef actorRef;
68         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
69
70         private MockShardManager(boolean found, ActorRef actorRef){
71
72             this.found = found;
73             this.actorRef = actorRef;
74         }
75
76         @Override public void onReceive(Object message) throws Exception {
77             if(message instanceof FindPrimary) {
78                 FindPrimary fp = (FindPrimary)message;
79                 Object resp = findPrimaryResponses.get(fp.getShardName());
80                 if(resp == null) {
81                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
82                 } else {
83                     getSender().tell(resp, getSelf());
84                 }
85
86                 return;
87             }
88
89             if(found){
90                 getSender().tell(new LocalShardFound(actorRef), getSelf());
91             } else {
92                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
93             }
94         }
95
96         void addFindPrimaryResp(String shardName, Object resp) {
97             findPrimaryResponses.put(shardName, resp);
98         }
99
100         private static Props props(final boolean found, final ActorRef actorRef){
101             return Props.create(new MockShardManagerCreator(found, actorRef) );
102         }
103
104         private static Props props(){
105             return Props.create(new MockShardManagerCreator() );
106         }
107
108         @SuppressWarnings("serial")
109         private static class MockShardManagerCreator implements Creator<MockShardManager> {
110             final boolean found;
111             final ActorRef actorRef;
112
113             MockShardManagerCreator() {
114                 this.found = false;
115                 this.actorRef = null;
116             }
117
118             MockShardManagerCreator(boolean found, ActorRef actorRef) {
119                 this.found = found;
120                 this.actorRef = actorRef;
121             }
122
123             @Override
124             public MockShardManager create() throws Exception {
125                 return new MockShardManager(found, actorRef);
126             }
127         }
128     }
129
130     @Test
131     public void testFindLocalShardWithShardFound(){
132         new JavaTestKit(getSystem()) {{
133
134             new Within(duration("1 seconds")) {
135                 @Override
136                 protected void run() {
137
138                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
139
140                     ActorRef shardManagerActorRef = getSystem()
141                         .actorOf(MockShardManager.props(true, shardActorRef));
142
143                     ActorContext actorContext =
144                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
145                             mock(Configuration.class));
146
147                     Optional<ActorRef> out = actorContext.findLocalShard("default");
148
149                     assertEquals(shardActorRef, out.get());
150
151
152                     expectNoMsg();
153                 }
154             };
155         }};
156
157     }
158
159     @Test
160     public void testFindLocalShardWithShardNotFound(){
161         new JavaTestKit(getSystem()) {{
162             ActorRef shardManagerActorRef = getSystem()
163                     .actorOf(MockShardManager.props(false, null));
164
165             ActorContext actorContext =
166                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
167                             mock(Configuration.class));
168
169             Optional<ActorRef> out = actorContext.findLocalShard("default");
170             assertTrue(!out.isPresent());
171         }};
172
173     }
174
175     @Test
176     public void testExecuteRemoteOperation() {
177         new JavaTestKit(getSystem()) {{
178             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
179
180             ActorRef shardManagerActorRef = getSystem()
181                     .actorOf(MockShardManager.props(true, shardActorRef));
182
183             ActorContext actorContext =
184                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
185                             mock(Configuration.class));
186
187             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
188
189             Object out = actorContext.executeOperation(actor, "hello");
190
191             assertEquals("hello", out);
192         }};
193     }
194
195     @Test
196     public void testExecuteRemoteOperationAsync() {
197         new JavaTestKit(getSystem()) {{
198             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
199
200             ActorRef shardManagerActorRef = getSystem()
201                     .actorOf(MockShardManager.props(true, shardActorRef));
202
203             ActorContext actorContext =
204                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
205                             mock(Configuration.class));
206
207             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
208
209             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
210
211             try {
212                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
213                 assertEquals("Result", "hello", result);
214             } catch(Exception e) {
215                 throw new AssertionError(e);
216             }
217         }};
218     }
219
220     @Test
221     public void testIsPathLocal() {
222         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
223         ActorContext actorContext = null;
224
225         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertEquals(false, actorContext.isPathLocal(null));
227         assertEquals(false, actorContext.isPathLocal(""));
228
229         clusterWrapper.setSelfAddress(null);
230         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231         assertEquals(false, actorContext.isPathLocal(""));
232
233         // even if the path is in local format, match the primary path (first 3 elements) and return true
234         clusterWrapper.setSelfAddress(new Address("akka", "test"));
235         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
237
238         clusterWrapper.setSelfAddress(new Address("akka", "test"));
239         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
241
242         clusterWrapper.setSelfAddress(new Address("akka", "test"));
243         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
245
246         // self address of remote format,but Tx path local format.
247         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
248         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249         assertEquals(true, actorContext.isPathLocal(
250             "akka://system/user/shardmanager/shard/transaction"));
251
252         // self address of local format,but Tx path remote format.
253         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
254         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255         assertEquals(false, actorContext.isPathLocal(
256             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
257
258         //local path but not same
259         clusterWrapper.setSelfAddress(new Address("akka", "test"));
260         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
262
263         //ip and port same
264         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
265         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
267
268         // forward-slash missing in address
269         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
270         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
272
273         //ips differ
274         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
277
278         //ports differ
279         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
280         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
281         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
282     }
283
284     @Test
285     public void testResolvePathForRemoteActor() {
286         ActorContext actorContext =
287                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
288                         ClusterWrapper.class),
289                         mock(Configuration.class));
290
291         String actual = actorContext.resolvePath(
292                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
293                 "akka://system/user/shardmanager/shard/transaction");
294
295         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
296
297         assertEquals(expected, actual);
298     }
299
300     @Test
301     public void testResolvePathForLocalActor() {
302         ActorContext actorContext =
303                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
304                         mock(Configuration.class));
305
306         String actual = actorContext.resolvePath(
307                 "akka://system/user/shardmanager/shard",
308                 "akka://system/user/shardmanager/shard/transaction");
309
310         String expected = "akka://system/user/shardmanager/shard/transaction";
311
312         assertEquals(expected, actual);
313     }
314
315     @Test
316     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
317         ActorContext actorContext =
318                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
319                         mock(Configuration.class));
320
321         String actual = actorContext.resolvePath(
322                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
323                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
324
325         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
326
327         assertEquals(expected, actual);
328     }
329
330
331     @Test
332     public void testClientDispatcherIsGlobalDispatcher(){
333         ActorContext actorContext =
334                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
335                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
336
337         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
338
339     }
340
341     @Test
342     public void testClientDispatcherIsNotGlobalDispatcher(){
343         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
344
345         ActorContext actorContext =
346                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
347                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
348
349         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
350
351         actorSystem.shutdown();
352
353     }
354
355     @Test
356     public void testSetDatastoreContext() {
357         new JavaTestKit(getSystem()) {{
358             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
359                             mock(Configuration.class), DatastoreContext.newBuilder().
360                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
361
362             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
363             assertEquals("getTransactionCommitOperationTimeout", 7,
364                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
365
366             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
367                     shardTransactionCommitTimeoutInSeconds(8).build();
368
369             actorContext.setDatastoreContext(newContext);
370
371             expectMsgClass(duration("5 seconds"), DatastoreContext.class);
372
373             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
374
375             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
376             assertEquals("getTransactionCommitOperationTimeout", 8,
377                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
378         }};
379     }
380
381     @Test
382     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
383
384             TestActorRef<MessageCollectorActor> shardManager =
385                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
386
387             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
388                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
389
390             final String expPrimaryPath = "akka://test-system/find-primary-shard";
391             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
392             ActorContext actorContext =
393                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
394                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
395                         @Override
396                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
397                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
398                         }
399                     };
400
401             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
402             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
403
404             assertNotNull(actual);
405             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
406             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
407                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
408             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
409
410             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
411
412             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
413
414             assertEquals(cachedInfo, actual);
415
416             actorContext.getPrimaryShardInfoCache().remove("foobar");
417
418             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
419
420             assertNull(cached);
421     }
422
423     @Test
424     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
425
426             TestActorRef<MessageCollectorActor> shardManager =
427                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
428
429             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
430                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
431
432             final DataTree mockDataTree = Mockito.mock(DataTree.class);
433             final String expPrimaryPath = "akka://test-system/find-primary-shard";
434             ActorContext actorContext =
435                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
436                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
437                         @Override
438                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
439                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
440                         }
441                     };
442
443             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
444             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
445
446             assertNotNull(actual);
447             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
448             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
449             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
450                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
451             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
452
453             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
454
455             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
456
457             assertEquals(cachedInfo, actual);
458
459             actorContext.getPrimaryShardInfoCache().remove("foobar");
460
461             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
462
463             assertNull(cached);
464     }
465
466     @Test
467     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
468         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
469     }
470
471     @Test
472     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
473         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
474     }
475
476     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
477         TestActorRef<MessageCollectorActor> shardManager =
478             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
479
480         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
481             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
482
483         ActorContext actorContext =
484             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
485                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
486                 @Override
487                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
488                     return Futures.successful(expectedException);
489                 }
490             };
491
492         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
493
494         try {
495             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
496             fail("Expected" + expectedException.getClass().toString());
497         } catch(Exception e){
498             if(!expectedException.getClass().isInstance(e)) {
499                 fail("Expected Exception of type " + expectedException.getClass().toString());
500             }
501         }
502
503         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
504
505         assertNull(cached);
506     }
507
508     @Test
509     public void testBroadcast() {
510         new JavaTestKit(getSystem()) {{
511             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
512             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
513
514             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
515             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
516             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
517                     DataStoreVersions.CURRENT_VERSION));
518             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
519                     DataStoreVersions.CURRENT_VERSION));
520             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
521
522             Configuration mockConfig = mock(Configuration.class);
523             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
524                     when(mockConfig).getAllShardNames();
525
526             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
527                     mock(ClusterWrapper.class), mockConfig,
528                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
529
530             actorContext.broadcast(new TestMessage());
531
532             expectFirstMatching(shardActorRef1, TestMessage.class);
533             expectFirstMatching(shardActorRef2, TestMessage.class);
534         }};
535     }
536
537     private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
538         int count = 5000 / 50;
539         for(int i = 0; i < count; i++) {
540             try {
541                 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
542                 if(message != null) {
543                     return message;
544                 }
545             } catch (Exception e) {}
546
547             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
548         }
549
550         Assert.fail("Did not receive message of type " + clazz);
551         return null;
552     }
553 }