import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+ import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
}
+ @Override public void onReceiveRecover(Object message) {
+ LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+ getSender());
+
+ if (message instanceof RecoveryFailure){
+ LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ } else {
+ super.onReceiveRecover(message);
+ }
+ }
+
@Override public void onReceiveCommand(Object message) {
- LOG.debug("Received message {} from {}", message.getClass().toString(),
+ LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
getSender());
if (message.getClass()
.tell(new CreateTransactionReply(
Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId()).toSerializable(),
- getSelf());
+ getSelf()
+ );
}
private void commit(final ActorRef sender, Object serialized) {
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- sender.tell(new CommitTransactionReply().toSerializable(),self);
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
+ sender.tell(new CommitTransactionReply().toSerializable(), self);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(new Date());
}
@Override
identifier, clientActor.path().toString());
}
-
} else {
LOG.error("Unknown state received {}", data);
}
}
- @Override protected Object createSnapshot() {
+ @Override protected void createSnapshot() {
throw new UnsupportedOperationException("createSnapshot");
}
- @Override protected void applySnapshot(Object snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
throw new UnsupportedOperationException("applySnapshot");
}