- }\r
-\r
- override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
- commitHandlers.put(path, registration)\r
- LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onRegister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
- }\r
- }\r
- return registration;\r
+ }
+
+ private static def <T> withLock(Lock lock,Callable<T> method) {
+ lock.lock
+ try {
+ return method.call
+ } finally {
+ lock.unlock
+ }
+ } \r
+\r
+ override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
+ return withLock(registrationLock) [|\r
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+ commitHandlers.put(path, registration)\r
+ LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+ for(listener : commitHandlerRegistrationListeners) {\r
+ try {\r
+ listener.instance.onRegister(registration);\r
+ } catch (Exception e) {\r
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+ }\r
+ }
+ return registration;
+ ]\r