This makes sure we check for failure before touching the result,
which is null if a failure occurs.
In order to keep disagnosti information we add a reference
to the message class being broadcast.
Change-Id: I26ab31a45916d11b61b990020bed89ae87233b14
Signed-off-by: Robert Varga <rovarga@cisco.com>
public Object apply(Short version) {
return new CloseTransactionChain(getHistoryId(), version).toSerializable();
}
public Object apply(Short version) {
return new CloseTransactionChain(getHistoryId(), version).toSerializable();
}
+ }, CloseTransactionChain.class);
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
/**
* Send the message to each and every shard
/**
* Send the message to each and every shard
- public void broadcast(final Function<Short, Object> messageSupplier){
+ public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass){
for(final String shardName : configuration.getAllShardNames()){
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
for(final String shardName : configuration.getAllShardNames()){
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
- Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
- message.getClass().getSimpleName(), shardName, failure);
+ messageClass.getSimpleName(), shardName, failure);
+ Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
public void testClose() {
new TransactionChainProxy(mockComponentFactory, historyId).close();
public void testClose() {
new TransactionChainProxy(mockComponentFactory, historyId).close();
- verify(mockActorContext, times(1)).broadcast(any(Function.class));
+ verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
public Object apply(Short v) {
return new TestMessage();
}
public Object apply(Short v) {
return new TestMessage();
}
MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);