2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.bgpcep.config.loader.impl;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.base.Stopwatch;
15 import java.io.FileInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.RandomAccessFile;
19 import java.net.URISyntaxException;
20 import java.nio.channels.FileChannel;
21 import java.nio.channels.FileLock;
22 import java.nio.file.ClosedWatchServiceException;
23 import java.nio.file.WatchKey;
24 import java.nio.file.WatchService;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import java.util.regex.Pattern;
31 import java.util.stream.Collectors;
32 import javax.annotation.concurrent.GuardedBy;
33 import javax.xml.stream.XMLInputFactory;
34 import javax.xml.stream.XMLStreamException;
35 import javax.xml.stream.XMLStreamReader;
36 import org.opendaylight.bgpcep.config.loader.spi.ConfigFileProcessor;
37 import org.opendaylight.bgpcep.config.loader.spi.ConfigLoader;
38 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
39 import org.opendaylight.yangtools.concepts.AbstractRegistration;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
42 import org.opendaylight.yangtools.yang.data.codec.xml.XmlParserStream;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter;
44 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
47 import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.xml.sax.SAXException;
52 public final class ConfigLoaderImpl implements ConfigLoader, AutoCloseable {
53 private static final Logger LOG = LoggerFactory.getLogger(ConfigLoaderImpl.class);
54 private static final String INTERRUPTED = "InterruptedException";
55 private static final String EXTENSION = "-.*\\.xml";
56 private static final String INITIAL = "^";
57 private static final String READ = "rw";
58 private static final long TIMEOUT_SECONDS = 5;
60 private final Map<String, ConfigFileProcessor> configServices = new HashMap<>();
61 private final SchemaContext schemaContext;
62 private final BindingNormalizedNodeSerializer bindingSerializer;
63 private final String path;
64 private final Thread watcherThread;
65 private final File file;
67 private boolean closed = false;
69 public ConfigLoaderImpl(final SchemaContext schemaContext, final BindingNormalizedNodeSerializer bindingSerializer,
70 final FileWatcher fileWatcher) {
71 this.schemaContext = requireNonNull(schemaContext);
72 this.bindingSerializer = requireNonNull(bindingSerializer);
73 this.path = requireNonNull(fileWatcher.getPathFile());
74 this.file = new File(this.path);
75 this.watcherThread = new Thread(new ConfigLoaderImplRunnable(requireNonNull(fileWatcher.getWatchService())));
79 this.watcherThread.start();
80 LOG.info("Config Loader service initiated");
83 private synchronized void handleConfigFile(final ConfigFileProcessor config, final String filename) {
84 final NormalizedNode<?, ?> dto;
86 dto = parseDefaultConfigFile(config, filename);
87 } catch (final IOException | XMLStreamException e) {
88 LOG.warn("Failed to parse config file {}", filename, e);
91 LOG.info("Loading initial config {}", filename);
92 config.loadConfiguration(dto);
95 private NormalizedNode<?, ?> parseDefaultConfigFile(final ConfigFileProcessor config, final String filename)
96 throws IOException, XMLStreamException {
97 final NormalizedNodeResult result = new NormalizedNodeResult();
98 final NormalizedNodeStreamWriter streamWriter = ImmutableNormalizedNodeStreamWriter.from(result);
100 final File newFile = new File(this.path, filename);
101 FileChannel channel = new RandomAccessFile(newFile, READ).getChannel();
103 FileLock lock = null;
104 final Stopwatch stopwatch = Stopwatch.createStarted();
105 while (lock == null || stopwatch.elapsed(TimeUnit.SECONDS) > TIMEOUT_SECONDS) {
107 lock = channel.tryLock();
108 } catch (final IllegalStateException e) {
114 } catch (InterruptedException e) {
115 LOG.warn("Failed to lock xml", e);
120 try (InputStream resourceAsStream = new FileInputStream(newFile)) {
121 final XMLInputFactory factory = XMLInputFactory.newInstance();
122 final XMLStreamReader reader = factory.createXMLStreamReader(resourceAsStream);
124 final SchemaNode schemaNode = SchemaContextUtil
125 .findDataSchemaNode(this.schemaContext, config.getSchemaPath());
126 try (XmlParserStream xmlParser = XmlParserStream.create(streamWriter, this.schemaContext, schemaNode)) {
127 xmlParser.parse(reader);
128 } catch (final URISyntaxException | XMLStreamException | IOException | SAXException e) {
129 LOG.warn("Failed to parse xml", e);
136 return result.getResult();
140 public synchronized AbstractRegistration registerConfigFile(final ConfigFileProcessor config) {
141 final String pattern = INITIAL + config.getSchemaPath().getLastComponent().getLocalName() + EXTENSION;
142 this.configServices.put(pattern, config);
144 final File[] fList = this.file.listFiles();
145 final List<String> newFiles = new ArrayList<>();
147 for (final File newFile : fList) {
148 if (newFile.isFile()) {
149 final String filename = newFile.getName();
150 if (Pattern.matches(pattern, filename)) {
151 newFiles.add(filename);
156 for (final String filename : newFiles) {
157 handleConfigFile(config, filename);
159 return new AbstractRegistration() {
161 protected void removeRegistration() {
162 synchronized (ConfigLoaderImpl.this) {
163 ConfigLoaderImpl.this.configServices.remove(pattern);
170 public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() {
171 return this.bindingSerializer;
176 public synchronized void close() {
177 LOG.info("Config Loader service closed");
179 this.watcherThread.interrupt();
182 private class ConfigLoaderImplRunnable implements Runnable {
184 private final WatchService watchService;
186 ConfigLoaderImplRunnable(final WatchService watchService) {
187 this.watchService = watchService;
192 while (!Thread.currentThread().isInterrupted()) {
197 private synchronized void handleChanges() {
200 key = this.watchService.take();
201 } catch (final InterruptedException | ClosedWatchServiceException e) {
202 if (!ConfigLoaderImpl.this.closed) {
203 LOG.warn(INTERRUPTED, e);
204 Thread.currentThread().interrupt();
210 final List<String> fileNames = key.pollEvents()
211 .stream().map(event -> event.context().toString())
212 .collect(Collectors.toList());
213 fileNames.forEach(this::handleEvent);
215 final boolean reset = key.reset();
217 LOG.warn("Could not reset the watch key.");
222 private synchronized void handleEvent(final String filename) {
223 ConfigLoaderImpl.this.configServices.entrySet().stream()
224 .filter(entry -> Pattern.matches(entry.getKey(), filename))
225 .forEach(entry -> handleConfigFile(entry.getValue(), filename));