+ @Override
+ public void setSystemListener(SystemNotificationsListener systemListener) {
+ this.systemListener = systemListener;
+ }
+
+ @Override
+ public void checkListeners() {
+ StringBuffer buffer = new StringBuffer();
+ if (systemListener == null) {
+ buffer.append("SystemListener ");
+ }
+ if (messageListener == null) {
+ buffer.append("MessageListener ");
+ }
+ if (connectionReadyListener == null) {
+ buffer.append("ConnectionReadyListener ");
+ }
+
+ if (buffer.length() > 0) {
+ throw new IllegalStateException("Missing listeners: " + buffer.toString());
+ }
+ }
+
+ static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
+ @Override
+ public void onRemoval(
+ RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
+ SettableFuture<?> future = notification.getValue();
+ if (!future.isDone()) {
+ LOG.warn("rpc response discarded: " + notification.getKey());
+ future.cancel(true);
+ }
+ }
+ }
+
+ /**
+ * Class is used ONLY for exiting msgQueue processing thread
+ * @author michal.polkorab
+ */
+ static class ExitingDataObject implements DataObject {
+ @Override
+ public Class<? extends DataContainer> getImplementedInterface() {
+ return null;
+ }
+ }
+
+ @Override
+ public void fireConnectionReadyNotification() {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ connectionReadyListener.onConnectionReady();
+ }
+ }).start();
+ }
+
+
+ @Override
+ public void setConnectionReadyListener(
+ ConnectionReadyListener connectionReadyListener) {
+ this.connectionReadyListener = connectionReadyListener;
+ }
+