} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
} else {
if(log.isDebugEnabled()) {
log.debug("Unhandled message [{}]", message);
*/
void receiveGetAllBuckets(){
final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+ sender.tell(new GetAllBucketsReply<T>(getAllBuckets()), getSelf());
}
/**
*
* @return self owned + remote buckets
*/
- @SuppressWarnings("rawtypes")
- Map<Address, Bucket> getAllBuckets(){
- Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+ Map<Address, Bucket<T>> getAllBuckets(){
+ Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
all.put(selfAddress, new BucketImpl<>(localBucket));
*
* @param members requested members
*/
- @SuppressWarnings("rawtypes")
void receiveGetBucketsByMembers(Set<Address> members){
final ActorRef sender = getSender();
- Map<Address, Bucket> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+ Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
+ sender.tell(new GetBucketsByMembersReply<T>(buckets), getSelf());
}
/**
* @param members requested members
* @return buckets for requested memebers
*/
- @SuppressWarnings("rawtypes")
- Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
- Map<Address, Bucket> buckets = new HashMap<>();
+ Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+ Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
if (members.contains(selfAddress)) {
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- @SuppressWarnings({ "rawtypes", "unchecked" })
- void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+ void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets){
log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty())
{
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
- for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+ for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()){
Long localVersion = versions.get(entry.getKey());
if (localVersion == null) {
}
}
- public static class ContainsBuckets implements Serializable{
+ public static class ContainsBuckets<T extends Copier<T>> implements Serializable{
private static final long serialVersionUID = -4940160367495308286L;
- private final Map<Address, Bucket> buckets;
+ private final Map<Address, Bucket<T>> buckets;
- public ContainsBuckets(Map<Address, Bucket> buckets){
+ public ContainsBuckets(Map<Address, Bucket<T>> buckets){
Preconditions.checkArgument(buckets != null, "buckets can not be null");
this.buckets = buckets;
}
- public Map<Address, Bucket> getBuckets() {
- Map<Address, Bucket> copy = new HashMap<>(buckets.size());
+ public Map<Address, Bucket<T>> getBuckets() {
+ Map<Address, Bucket<T>> copy = new HashMap<>(buckets.size());
- for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
+ for (Map.Entry<Address, Bucket<T>> entry : buckets.entrySet()){
//ignore null entries
if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
continue;
}
}
- public static class GetAllBucketsReply extends ContainsBuckets implements Serializable{
+ public static class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
private static final long serialVersionUID = 1L;
- public GetAllBucketsReply(Map<Address, Bucket> buckets) {
+ public GetAllBucketsReply(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
- public static class GetBucketsByMembersReply extends ContainsBuckets implements Serializable{
+ public static class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
private static final long serialVersionUID = 1L;
- public GetBucketsByMembersReply(Map<Address, Bucket> buckets) {
+ public GetBucketsByMembersReply(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
}
}
- public static class UpdateRemoteBuckets extends ContainsBuckets implements Serializable{
+ public static class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
private static final long serialVersionUID = 1L;
- public UpdateRemoteBuckets(Map<Address, Bucket> buckets) {
+ public UpdateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
}
}
- public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
+ public static final class GossipEnvelope<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable {
private static final long serialVersionUID = 8346634072582438818L;
private final Address from;
private final Address to;
- public GossipEnvelope(Address from, Address to, Map<Address, Bucket> buckets) {
+ public GossipEnvelope(Address from, Address to, Map<Address, Bucket<T>> buckets) {
super(buckets);
Preconditions.checkArgument(to != null, "Recipient of message must not be null");
this.to = to;
// Bucket store should get an update bucket message. Updated bucket contains added rpc.
- Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
verifyBucket(buckets.get(nodeAddress), addedRouteIds);
Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
// Bucket store on node2 should get a message to update its local copy of remote buckets
- Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
verifyBucket(buckets.get(node1Address), addedRouteIds);
// Now remove
private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
throws AssertionError {
- Map<Address, Bucket> buckets;
+ Map<Address, Bucket<RoutingTable>> buckets;
int nTries = 0;
while(true) {
buckets = retrieveBuckets(registry1, testKit, address);
Address node1Address = node1.provider().getDefaultAddress();
Address node2Address = node2.provider().getDefaultAddress();
- Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+ Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
node2Address);
verifyBucket(buckets.get(node1Address), addedRouteIds1);
Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
}
- private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
+ private Map<Address, Bucket<RoutingTable>> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
Address... addresses) {
int nTries = 0;
while(true) {
bucketStore.tell(new GetAllBuckets(), testKit.getRef());
- GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ @SuppressWarnings("unchecked")
+ GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
GetAllBucketsReply.class);
- Map<Address, Bucket> buckets = reply.getBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = reply.getBuckets();
boolean foundAll = true;
for(Address addr: addresses) {
- Bucket bucket = buckets.get(addr);
+ Bucket<RoutingTable> bucket = buckets.get(addr);
if(bucket == null) {
foundAll = false;
break;
}
}
- @SuppressWarnings("unchecked")
@Test
public void testAddRoutesConcurrency() throws Exception {
final JavaTestKit testKit = new JavaTestKit(node1);
int nTries = 0;
while(true) {
registry1.tell(getAllBuckets, testKit.getRef());
- GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+ @SuppressWarnings("unchecked")
+ GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
RoutingTable table = localBucket.getData();
routeIds.add(new RouteIdentifierImpl(null, type, null));
return routeIds;
}
+
}
public class BucketStoreTest {
+ /**
+ * Dummy class to eliminate rawtype warnings.
+ *
+ * @author gwu
+ *
+ */
+ private static class T implements Copier<T> {
+ @Override
+ public T copy() {
+ return new T();
+ }
+ }
+
private static ActorSystem system;
@BeforeClass
@Test
public void testReceiveUpdateRemoteBuckets(){
- BucketStore store = createStore();
+ BucketStore<T> store = createStore();
Address localAddress = system.provider().getDefaultAddress();
- Bucket localBucket = new BucketImpl();
+ Bucket<T> localBucket = new BucketImpl<>();
Address a1 = new Address("tcp", "system1");
Address a2 = new Address("tcp", "system2");
Address a3 = new Address("tcp", "system3");
- Bucket b1 = new BucketImpl();
- Bucket b2 = new BucketImpl();
- Bucket b3 = new BucketImpl();
+ Bucket<T> b1 = new BucketImpl<>();
+ Bucket<T> b2 = new BucketImpl<>();
+ Bucket<T> b3 = new BucketImpl<>();
- Map<Address, Bucket> remoteBuckets = new HashMap<>(3);
+ Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
remoteBuckets.put(a1, b1);
remoteBuckets.put(a2, b2);
remoteBuckets.put(a3, b3);
//Should NOT contain local bucket
//Should contain ONLY 3 entries i.e a1, a2, a3
- Map<Address, Bucket<?>> remoteBucketsInStore = store.getRemoteBuckets();
+ Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
Assert.assertTrue(remoteBucketsInStore.size() == 3);
//Add a new remote bucket
Address a4 = new Address("tcp", "system4");
- Bucket b4 = new BucketImpl();
+ Bucket<T> b4 = new BucketImpl<T>();
remoteBuckets.clear();
remoteBuckets.put(a4, b4);
store.receiveUpdateRemoteBuckets(remoteBuckets);
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Update a bucket
- Bucket b3_new = new BucketImpl();
+ Bucket<T> b3_new = new BucketImpl<T>();
remoteBuckets.clear();
remoteBuckets.put(a3, b3_new);
remoteBuckets.put(a1, null);
//Should only update a3
remoteBucketsInStore = store.getRemoteBuckets();
- Bucket b3_inStore = remoteBucketsInStore.get(a3);
+ Bucket<T> b3_inStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3_new.getVersion(), b3_inStore.getVersion());
//Should NOT update a1 and a2
- Bucket b1_inStore = remoteBucketsInStore.get(a1);
- Bucket b2_inStore = remoteBucketsInStore.get(a2);
+ Bucket<T> b1_inStore = remoteBucketsInStore.get(a1);
+ Bucket<T> b2_inStore = remoteBucketsInStore.get(a2);
Assert.assertEquals(b1.getVersion(), b1_inStore.getVersion());
Assert.assertEquals(b2.getVersion(), b2_inStore.getVersion());
Assert.assertTrue(remoteBucketsInStore.size() == 4);
*
* @return instance of BucketStore class
*/
- private static BucketStore createStore(){
+ private static BucketStore<T> createStore(){
final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()));
- final TestActorRef<BucketStore> testRef = TestActorRef.create(system, props, "testStore");
+ final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
return testRef.underlyingActor();
}
-}
\ No newline at end of file
+
+}