import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
+@Designate(ocd = DeviceTransactionManagerImpl.Configuration.class)
public final class DeviceTransactionManagerImpl implements DeviceTransactionManager {
+ @ObjectClassDefinition
+ public @interface Configuration {
+ @AttributeDefinition(description = "Minimum number of threads in the checking pool", min = "0")
+ int checkingMinThreads() default DEFAULT_CHECKING_MIN_THREADS;
+ @AttributeDefinition(description = "Number of threads in the listening pool", min = "1")
+ int listeningThreads() default DEFAULT_LISTENING_THREADS;
+ @AttributeDefinition(description = "Maximum time to wait for transaction submit, in milliseconds", min = "0")
+ long maxDurationToSubmit() default DEFAULT_MAX_DURATION_TO_SUBMIT;
+ @AttributeDefinition(description = "Maximum time to wait for get-data submit, in milliseconds", min = "0")
+ long maxDurationToGetData() default DEFAULT_MAX_DURATION_TO_GET_DATA;
+ }
// TODO cache device data brokers
// TODO remove disconnected devices from maps
private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
- // TODO add an @ObjectClassDefinition to make these configurable at runtime
- private static final int NUMBER_OF_THREADS = 4;
- private static final long GET_DATA_SUBMIT_TIMEOUT = 3000;
- private static final TimeUnit GET_DATA_SUBMIT_TIME_UNIT = TimeUnit.MILLISECONDS;
- // TODO set reasonable value here for maxDurationToSubmitTransaction
- private static final long MAX_DURATION_TO_SUBMIT = 15000;
- private static final TimeUnit MAX_DURATION_TO_SUBMIT_TIMEUNIT = TimeUnit.MILLISECONDS;
+ private static final long DEFAULT_MAX_DURATION_TO_GET_DATA = 3000;
+ private static final long DEFAULT_MAX_DURATION_TO_SUBMIT = 15000;
+ private static final int DEFAULT_CHECKING_MIN_THREADS = 4;
+ private static final int DEFAULT_LISTENING_THREADS = 4;
private final MountPointService mountPointService;
private final ScheduledExecutorService checkingExecutor;
private final ListeningExecutorService listeningExecutor;
private final ConcurrentMap<String, CountDownLatch> deviceLocks = new ConcurrentHashMap<>();
private final long maxDurationToSubmitTransaction;
+ private final long maxDurationToGetData;
@Activate
- public DeviceTransactionManagerImpl(@Reference MountPointService mountPointService) {
- this(mountPointService, MAX_DURATION_TO_SUBMIT);
+ public DeviceTransactionManagerImpl(@Reference MountPointService mountPointService, Configuration configuration) {
+ this(mountPointService, configuration.maxDurationToSubmit(), configuration.maxDurationToGetData(),
+ configuration.checkingMinThreads(), configuration.listeningThreads());
+ }
+
+ public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
+ this(mountPointService, maxDurationToSubmitTransaction, DEFAULT_MAX_DURATION_TO_GET_DATA,
+ DEFAULT_CHECKING_MIN_THREADS, DEFAULT_LISTENING_THREADS);
}
- public DeviceTransactionManagerImpl(MountPointService mountPointService,
- long maxDurationToSubmitTransaction) {
+ public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction,
+ long maxDurationToGetData, int checkingPoolMinThreads, int listeningPoolThreads) {
this.mountPointService = requireNonNull(mountPointService);
this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
- this.checkingExecutor = Executors.newScheduledThreadPool(NUMBER_OF_THREADS);
- this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUMBER_OF_THREADS));
+ this.maxDurationToGetData = maxDurationToGetData;
+ this.checkingExecutor = Executors.newScheduledThreadPool(checkingPoolMinThreads);
+ this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(listeningPoolThreads));
}
@Override
public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
- return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, MAX_DURATION_TO_SUBMIT_TIMEUNIT);
+ return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, TimeUnit.MILLISECONDS);
}
@Override
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
} finally {
- deviceTx.commit(GET_DATA_SUBMIT_TIMEOUT, GET_DATA_SUBMIT_TIME_UNIT);
+ deviceTx.commit(maxDurationToGetData, TimeUnit.MILLISECONDS);
}
} else {
LOG.error("Could not obtain transaction for device {}!", deviceId);