import static java.util.Objects.requireNonNull;
+import com.google.common.base.Stopwatch;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.RandomAccessFile;
import java.net.URISyntaxException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
import java.nio.file.ClosedWatchServiceException;
-import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.stream.XMLInputFactory;
private static final String INTERRUPTED = "InterruptedException";
private static final String EXTENSION = "-.*\\.xml";
private static final String INITIAL = "^";
+ private static final String READ = "rw";
+ private static final long TIMEOUT_SECONDS = 5;
@GuardedBy("this")
private final Map<String, ConfigFileProcessor> configServices = new HashMap<>();
private final SchemaContext schemaContext;
private final BindingNormalizedNodeSerializer bindingSerializer;
private final String path;
private final Thread watcherThread;
+ private final File file;
@GuardedBy("this")
private boolean closed = false;
this.bindingSerializer = requireNonNull(bindingSerializer);
this.path = requireNonNull(path);
requireNonNull(watchService);
+ this.file = new File(this.path);
this.watcherThread = new Thread(new ConfigLoaderImplRunnable(watchService));
this.watcherThread.start();
LOG.info("Config Loader service initiated");
final NormalizedNode<?, ?> dto;
try {
dto = parseDefaultConfigFile(config, filename);
- } catch (final IOException | XMLStreamException | ParserConfigurationException | SAXException
- | URISyntaxException e) {
+ } catch (final IOException | XMLStreamException e) {
LOG.warn("Failed to parse config file {}", filename, e);
return;
}
}
private NormalizedNode<?, ?> parseDefaultConfigFile(final ConfigFileProcessor config, final String filename)
- throws IOException, XMLStreamException, ParserConfigurationException, SAXException, URISyntaxException {
+ throws IOException, XMLStreamException {
final NormalizedNodeResult result = new NormalizedNodeResult();
final NormalizedNodeStreamWriter streamWriter = ImmutableNormalizedNodeStreamWriter.from(result);
- final InputStream resourceAsStream = new FileInputStream(new File(this.path, filename));
- final XMLInputFactory factory = XMLInputFactory.newInstance();
- final XMLStreamReader reader = factory.createXMLStreamReader(resourceAsStream);
+ final File newFile = new File(this.path, filename);
+ FileChannel channel = new RandomAccessFile(newFile, READ).getChannel();
- final SchemaNode schemaNode = SchemaContextUtil.findDataSchemaNode(this.schemaContext, config.getSchemaPath());
- final XmlParserStream xmlParser = XmlParserStream.create(streamWriter, this.schemaContext, schemaNode);
- xmlParser.parse(reader);
+ FileLock lock = null;
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ while (lock == null || stopwatch.elapsed(TimeUnit.SECONDS) > TIMEOUT_SECONDS) {
+ try {
+ lock = channel.tryLock();
+ } catch (final IllegalStateException e) {
+ //Ignore
+ }
+ if (lock == null) {
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to lock xml", e);
+ }
+ }
+ }
+
+ try (InputStream resourceAsStream = new FileInputStream(newFile)) {
+ final XMLInputFactory factory = XMLInputFactory.newInstance();
+ final XMLStreamReader reader = factory.createXMLStreamReader(resourceAsStream);
+
+ final SchemaNode schemaNode = SchemaContextUtil
+ .findDataSchemaNode(this.schemaContext, config.getSchemaPath());
+ try (XmlParserStream xmlParser = XmlParserStream.create(streamWriter, this.schemaContext, schemaNode)) {
+ xmlParser.parse(reader);
+ } catch (final URISyntaxException | XMLStreamException | IOException | ParserConfigurationException
+ | SAXException e) {
+ LOG.warn("Failed to parse xml", e);
+ } finally {
+ reader.close();
+ channel.close();
+ }
+ }
return result.getResult();
}
final String pattern = INITIAL + config.getSchemaPath().getLastComponent().getLocalName() + EXTENSION;
this.configServices.put(pattern, config);
- final File[] fList = new File(this.path).listFiles();
+ final File[] fList = this.file.listFiles();
+ final List<String> newFiles = new ArrayList<>();
if (fList != null) {
- for (final File file : fList) {
- if (file.isFile()) {
- final String filename = file.getName();
+ for (final File newFile : fList) {
+ if (newFile.isFile()) {
+ final String filename = newFile.getName();
if (Pattern.matches(pattern, filename)) {
- handleConfigFile(config, filename);
+ newFiles.add(filename);
}
}
}
}
-
+ for (final String filename : newFiles) {
+ handleConfigFile(config, filename);
+ }
return new AbstractRegistration() {
@Override
protected void removeRegistration() {
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() {
LOG.info("Config Loader service closed");
this.closed = true;
this.watcherThread.interrupt();
}
if (key != null) {
- for (final WatchEvent<?> event : key.pollEvents()) {
- handleEvent(event.context().toString());
- }
+ final List<String> fileNames = key.pollEvents()
+ .stream().map(event -> event.context().toString())
+ .collect(Collectors.toList());
+ fileNames.forEach(this::handleEvent);
+
final boolean reset = key.reset();
if (!reset) {
LOG.warn("Could not reset the watch key.");
.forEach(entry -> handleConfigFile(entry.getValue(), filename));
}
}
-}
\ No newline at end of file
+}