BGPCEP-770: Lock file before processing
[bgpcep.git] / config-loader / config-loader-impl / src / main / java / org / opendaylight / bgpcep / config / loader / impl / ConfigLoaderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.bgpcep.config.loader.impl;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.Stopwatch;
14 import java.io.File;
15 import java.io.FileInputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.RandomAccessFile;
19 import java.net.URISyntaxException;
20 import java.nio.channels.FileChannel;
21 import java.nio.channels.FileLock;
22 import java.nio.file.ClosedWatchServiceException;
23 import java.nio.file.WatchKey;
24 import java.nio.file.WatchService;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import java.util.regex.Pattern;
31 import java.util.stream.Collectors;
32 import javax.annotation.concurrent.GuardedBy;
33 import javax.xml.parsers.ParserConfigurationException;
34 import javax.xml.stream.XMLInputFactory;
35 import javax.xml.stream.XMLStreamException;
36 import javax.xml.stream.XMLStreamReader;
37 import org.opendaylight.bgpcep.config.loader.spi.ConfigFileProcessor;
38 import org.opendaylight.bgpcep.config.loader.spi.ConfigLoader;
39 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
40 import org.opendaylight.yangtools.concepts.AbstractRegistration;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
43 import org.opendaylight.yangtools.yang.data.codec.xml.XmlParserStream;
44 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter;
45 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
48 import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.xml.sax.SAXException;
52
53 public final class ConfigLoaderImpl implements ConfigLoader, AutoCloseable {
54     private static final Logger LOG = LoggerFactory.getLogger(ConfigLoaderImpl.class);
55     private static final String INTERRUPTED = "InterruptedException";
56     private static final String EXTENSION = "-.*\\.xml";
57     private static final String INITIAL = "^";
58     private static final String READ = "rw";
59     private static final long TIMEOUT_SECONDS = 5;
60     @GuardedBy("this")
61     private final Map<String, ConfigFileProcessor> configServices = new HashMap<>();
62     private final SchemaContext schemaContext;
63     private final BindingNormalizedNodeSerializer bindingSerializer;
64     private final String path;
65     private final Thread watcherThread;
66     private final File file;
67     @GuardedBy("this")
68     private boolean closed = false;
69
70     public ConfigLoaderImpl(final SchemaContext schemaContext, final BindingNormalizedNodeSerializer bindingSerializer,
71             final FileWatcher fileWatcher) {
72         this.schemaContext = requireNonNull(schemaContext);
73         this.bindingSerializer = requireNonNull(bindingSerializer);
74         this.path = requireNonNull(fileWatcher.getPathFile());
75         this.file = new File(this.path);
76         this.watcherThread = new Thread(new ConfigLoaderImplRunnable(requireNonNull(fileWatcher.getWatchService())));
77     }
78
79     public void init() {
80         this.watcherThread.start();
81         LOG.info("Config Loader service initiated");
82     }
83
84     private void handleConfigFile(final ConfigFileProcessor config, final String filename) {
85         final NormalizedNode<?, ?> dto;
86         try {
87             dto = parseDefaultConfigFile(config, filename);
88         } catch (final IOException | XMLStreamException e) {
89             LOG.warn("Failed to parse config file {}", filename, e);
90             return;
91         }
92         LOG.info("Loading initial config {}", filename);
93         config.loadConfiguration(dto);
94     }
95
96     private NormalizedNode<?, ?> parseDefaultConfigFile(final ConfigFileProcessor config, final String filename)
97             throws IOException, XMLStreamException {
98         final NormalizedNodeResult result = new NormalizedNodeResult();
99         final NormalizedNodeStreamWriter streamWriter = ImmutableNormalizedNodeStreamWriter.from(result);
100
101         final File newFile = new File(this.path, filename);
102         FileChannel channel = new RandomAccessFile(newFile, READ).getChannel();
103
104         FileLock lock = null;
105         final Stopwatch stopwatch = Stopwatch.createStarted();
106         while (lock == null || stopwatch.elapsed(TimeUnit.SECONDS) > TIMEOUT_SECONDS) {
107             try {
108                 lock = channel.tryLock();
109             } catch (final IllegalStateException e) {
110                 //Ignore
111             }
112             if (lock == null) {
113                 try {
114                     Thread.sleep(100L);
115                 } catch (InterruptedException e) {
116                     LOG.warn("Failed to lock xml", e);
117                 }
118             }
119         }
120
121         try (InputStream resourceAsStream = new FileInputStream(newFile)) {
122             final XMLInputFactory factory = XMLInputFactory.newInstance();
123             final XMLStreamReader reader = factory.createXMLStreamReader(resourceAsStream);
124
125             final SchemaNode schemaNode = SchemaContextUtil
126                     .findDataSchemaNode(this.schemaContext, config.getSchemaPath());
127             try (XmlParserStream xmlParser = XmlParserStream.create(streamWriter, this.schemaContext, schemaNode)) {
128                 xmlParser.parse(reader);
129             } catch (final URISyntaxException | XMLStreamException | IOException | ParserConfigurationException
130                     | SAXException e) {
131                 LOG.warn("Failed to parse xml", e);
132             } finally {
133                 reader.close();
134                 channel.close();
135             }
136         }
137
138         return result.getResult();
139     }
140
141     @Override
142     public synchronized AbstractRegistration registerConfigFile(final ConfigFileProcessor config) {
143         final String pattern = INITIAL + config.getSchemaPath().getLastComponent().getLocalName() + EXTENSION;
144         this.configServices.put(pattern, config);
145
146         final File[] fList = this.file.listFiles();
147         final List<String> newFiles = new ArrayList<>();
148         if (fList != null) {
149             for (final File newFile : fList) {
150                 if (newFile.isFile()) {
151                     final String filename = newFile.getName();
152                     if (Pattern.matches(pattern, filename)) {
153                         newFiles.add(filename);
154                     }
155                 }
156             }
157         }
158         for (final String filename : newFiles) {
159             handleConfigFile(config, filename);
160         }
161         return new AbstractRegistration() {
162             @Override
163             protected void removeRegistration() {
164                 synchronized (ConfigLoaderImpl.this) {
165                     ConfigLoaderImpl.this.configServices.remove(pattern);
166                 }
167             }
168         };
169     }
170
171     @Override
172     public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() {
173         return this.bindingSerializer;
174     }
175
176
177     @Override
178     public synchronized void close() {
179         LOG.info("Config Loader service closed");
180         this.closed = true;
181         this.watcherThread.interrupt();
182     }
183
184     private class ConfigLoaderImplRunnable implements Runnable {
185         @GuardedBy("this")
186         private final WatchService watchService;
187
188         ConfigLoaderImplRunnable(final WatchService watchService) {
189             this.watchService = watchService;
190         }
191
192         @Override
193         public void run() {
194             while (!Thread.currentThread().isInterrupted()) {
195                 handleChanges();
196             }
197         }
198
199         private synchronized void handleChanges() {
200             final WatchKey key;
201             try {
202                 key = this.watchService.take();
203             } catch (final InterruptedException | ClosedWatchServiceException e) {
204                 if (!ConfigLoaderImpl.this.closed) {
205                     LOG.warn(INTERRUPTED, e);
206                     Thread.currentThread().interrupt();
207                 }
208                 return;
209             }
210
211             if (key != null) {
212                 final List<String> fileNames = key.pollEvents()
213                         .stream().map(event -> event.context().toString())
214                         .collect(Collectors.toList());
215                 fileNames.forEach(this::handleEvent);
216
217                 final boolean reset = key.reset();
218                 if (!reset) {
219                     LOG.warn("Could not reset the watch key.");
220                 }
221             }
222         }
223
224         private synchronized void handleEvent(final String filename) {
225             ConfigLoaderImpl.this.configServices.entrySet().stream()
226                     .filter(entry -> Pattern.matches(entry.getKey(), filename))
227                     .forEach(entry -> handleConfigFile(entry.getValue(), filename));
228         }
229     }
230 }