commit | author | age
|
d1e1f5
|
1 |
#!/usr/bin/env python3 |
JK |
2 |
'''This script is designed to load quasi-static data into a PostGIS database |
|
3 |
for rendering maps. It differs from the usual scripts to do this in that it is |
|
4 |
designed to take its configuration from a file rather than be a series of shell |
|
5 |
commands. |
|
6 |
|
|
7 |
Some implicit assumptions are |
|
8 |
- Time spent querying (rendering) the data is more valuable than the one-time |
|
9 |
cost of loading it |
|
10 |
- The script will not be running multiple times in parallel. This is not |
|
11 |
normally likely because the script is likely to be called daily or less, |
|
12 |
not minutely. |
|
13 |
- Usage patterns will be similar to typical map rendering |
|
14 |
''' |
|
15 |
|
|
16 |
import yaml |
|
17 |
import os |
|
18 |
import os.path |
|
19 |
import re |
|
20 |
import argparse |
|
21 |
import shutil |
|
22 |
|
|
23 |
# modules for getting data |
|
24 |
import zipfile |
|
25 |
import requests |
|
26 |
import io |
|
27 |
import time |
|
28 |
import email.utils |
|
29 |
|
|
30 |
# modules for converting and postgres loading |
|
31 |
import subprocess |
|
32 |
import psycopg2 |
|
33 |
|
|
34 |
import logging |
|
35 |
|
|
36 |
|
|
37 |
def database_setup(conn, temp_schema, schema, metadata_table): |
|
38 |
with conn.cursor() as cur: |
|
39 |
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};''' |
|
40 |
.format(temp_schema=temp_schema)) |
|
41 |
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"''' |
|
42 |
''' (name text primary key, last_modified text);''') |
|
43 |
.format(schema=schema, metadata_table=metadata_table)) |
|
44 |
conn.commit() |
|
45 |
|
|
46 |
|
|
47 |
class Table: |
|
48 |
def __init__(self, name, conn, temp_schema, schema, metadata_table): |
|
49 |
self._name = name |
|
50 |
self._conn = conn |
|
51 |
self._temp_schema = temp_schema |
|
52 |
self._dst_schema = schema |
|
53 |
self._metadata_table = metadata_table |
|
54 |
|
|
55 |
# Clean up the temporary schema in preperation for loading |
|
56 |
def clean_temp(self): |
|
57 |
with self._conn.cursor() as cur: |
|
58 |
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"''' |
|
59 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
60 |
self._conn.commit() |
|
61 |
|
|
62 |
# get the last modified date from the metadata table |
|
63 |
def last_modified(self): |
|
64 |
with self._conn.cursor() as cur: |
|
65 |
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s''' |
|
66 |
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name]) |
|
67 |
results = cur.fetchone() |
|
68 |
if results is not None: |
|
69 |
return results[0] |
|
70 |
return '' |
|
71 |
|
|
72 |
def index(self): |
|
73 |
with self._conn.cursor() as cur: |
|
74 |
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end. |
|
75 |
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );''' |
|
76 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
77 |
# ogr creates a ogc_fid column we don't need |
|
78 |
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;''' |
|
79 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
80 |
|
|
81 |
# Null geometries are useless for rendering |
|
82 |
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;''' |
|
83 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
84 |
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;''' |
|
85 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
86 |
# sorting static tables helps performance and reduces size from the column drop above |
|
87 |
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" ''' |
|
88 |
'''(ST_Envelope(way));''' |
|
89 |
'''CLUSTER "{temp_schema}"."{name}" ''' |
|
90 |
'''USING "{name}_order";''' |
|
91 |
'''DROP INDEX "{temp_schema}"."{name}_order";''' |
|
92 |
'''CREATE INDEX ON "{temp_schema}"."{name}" ''' |
|
93 |
'''USING GIST (way) WITH (fillfactor=100);''') |
|
94 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
95 |
# Reset autovacuum. The table is static, so this doesn't really |
|
96 |
# matter since it'll never need a vacuum. |
|
97 |
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );''' |
|
98 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
99 |
self._conn.commit() |
|
100 |
|
|
101 |
# VACUUM can't be run in transaction, so autocommit needs to be turned on |
|
102 |
old_autocommit = self._conn.autocommit |
|
103 |
try: |
|
104 |
self._conn.autocommit = True |
|
105 |
with self._conn.cursor() as cur: |
|
106 |
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";''' |
|
107 |
.format(name=self._name, temp_schema=self._temp_schema)) |
|
108 |
finally: |
|
109 |
self._conn.autocommit = old_autocommit |
|
110 |
|
|
111 |
def replace(self, new_last_modified): |
|
112 |
with self._conn.cursor() as cur: |
|
113 |
cur.execute('''BEGIN;''') |
|
114 |
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";''' |
|
115 |
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''') |
|
116 |
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema)) |
|
117 |
|
|
118 |
# We checked if the metadata table had this table way up above |
|
119 |
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s''' |
|
120 |
.format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
121 |
[self._name]) |
|
122 |
if cur.rowcount == 0: |
|
123 |
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" ''' |
|
124 |
'''(name, last_modified) VALUES (%s, %s)''') |
|
125 |
.format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
126 |
[self._name, new_last_modified]) |
|
127 |
else: |
|
128 |
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s''' |
|
129 |
.format(schema=self._dst_schema, metadata_table=self._metadata_table), |
|
130 |
[new_last_modified, self._name]) |
|
131 |
self._conn.commit() |
|
132 |
|
|
133 |
|
|
134 |
def should_redownload(req, url, file_name): |
|
135 |
if not os.path.isfile(file_name): |
|
136 |
return True |
|
137 |
download_head = req.head(url, allow_redirects=True) |
|
138 |
if 'last-modified' in download_head.headers: |
|
139 |
last_modified = time.mktime(email.utils.parsedate(download_head.headers['last-modified'])) |
|
140 |
if last_modified > os.path.getmtime(file_name): |
|
141 |
logging.debug('Will redownload {} due to modification on server'.format(file_name)) |
|
142 |
return True |
|
143 |
if 'content-length' in download_head.headers: |
|
144 |
file_size = int(download_head.headers['content-length']) |
|
145 |
if file_size != os.path.getsize(file_name): |
|
146 |
logging.debug('Will redownload {} due to size mismatch'.format(file_name)) |
|
147 |
return True |
|
148 |
else: |
|
149 |
return False |
|
150 |
logging.debug('Will redownload {} due to missing Content-Length header'.format(file_name)) |
|
151 |
return True |
|
152 |
|
|
153 |
|
|
154 |
def main(): |
|
155 |
# parse options |
|
156 |
parser = argparse.ArgumentParser(description="Load external data into a database") |
|
157 |
|
|
158 |
parser.add_argument("-r", "--redownload", action="store_true", help="Redownload external files, even if not required") |
|
159 |
parser.add_argument("-f", "--force", action="store_true", help="Recreate database objects, even if not required") |
|
160 |
|
|
161 |
parser.add_argument("-c", "--config", action="store", default="external-data.yml", |
|
162 |
help="Name of configuration file (default external-data.yml)") |
|
163 |
parser.add_argument("-D", "--data", action="store", help="Override data download directory") |
|
164 |
|
|
165 |
parser.add_argument("-d", "--database", action="store", help="Override database name to connect to") |
|
166 |
parser.add_argument("-H", "--host", action="store", |
|
167 |
help="Override database server host or socket directory") |
|
168 |
parser.add_argument("-p", "--port", action="store", help="Override database server port") |
|
169 |
parser.add_argument("-U", "--username", action="store", help="Override database user name") |
|
170 |
parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q") |
|
171 |
parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems") |
|
172 |
|
|
173 |
opts = parser.parse_args() |
|
174 |
|
|
175 |
if opts.verbose: |
|
176 |
logging.basicConfig(level=logging.DEBUG) |
|
177 |
elif opts.quiet: |
|
178 |
logging.basicConfig(level=logging.WARNING) |
|
179 |
else: |
|
180 |
logging.basicConfig(level=logging.INFO) |
|
181 |
|
|
182 |
with open(opts.config) as config_file: |
|
183 |
config = yaml.safe_load(config_file) |
|
184 |
data_dir = opts.data or config["settings"]["data_dir"] |
|
185 |
os.makedirs(data_dir, exist_ok=True) |
|
186 |
|
|
187 |
# If the DB options are unspecified in both on the command line and in the |
|
188 |
# config file, libpq will pick what to use with the None |
|
189 |
database = opts.database or config["settings"].get("database") |
|
190 |
host = opts.host or config["settings"].get("host") |
|
191 |
port = opts.port or config["settings"].get("port") |
|
192 |
user = opts.username or config["settings"].get("username") |
|
193 |
with requests.Session() as s, \ |
|
194 |
psycopg2.connect(database=database, |
|
195 |
host=host, port=port, |
|
196 |
user=user) as conn: |
|
197 |
|
|
198 |
s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'}) |
|
199 |
|
|
200 |
# DB setup |
|
201 |
database_setup(conn, config["settings"]["temp_schema"], |
|
202 |
config["settings"]["schema"], |
|
203 |
config["settings"]["metadata_table"]) |
|
204 |
|
|
205 |
for name, source in config["sources"].items(): |
|
206 |
logging.info("Checking table {}".format(name)) |
|
207 |
# Don't attempt to handle strange names |
|
208 |
# Even if there was code to escape them properly here, you don't want |
|
209 |
# in a style with all the quoting headaches |
|
210 |
if not re.match('''^[a-zA-Z0-9_]+$''', name): |
|
211 |
raise RuntimeError("Only ASCII alphanumeric table are names supported") |
|
212 |
|
|
213 |
workingdir = os.path.join(data_dir, name) |
|
214 |
# Clean up anything left over from an aborted run |
|
215 |
shutil.rmtree(workingdir, ignore_errors=True) |
|
216 |
|
|
217 |
os.makedirs(workingdir, exist_ok=True) |
|
218 |
|
|
219 |
this_table = Table(name, conn, |
|
220 |
config["settings"]["temp_schema"], |
|
221 |
config["settings"]["schema"], |
|
222 |
config["settings"]["metadata_table"]) |
|
223 |
this_table.clean_temp() |
|
224 |
|
|
225 |
file_name = os.path.join(data_dir, os.path.basename(source["url"])) |
|
226 |
last_modified = None |
|
227 |
if opts.redownload or should_redownload(s, source["url"], file_name): |
|
228 |
logging.info("Downloading file {}".format(file_name)) |
|
229 |
download = s.get(source["url"], stream=True) |
|
230 |
download.raise_for_status() |
|
231 |
with open(file_name, "wb") as f: |
|
232 |
for chunk in download.iter_content(chunk_size=8388608): |
|
233 |
f.write(chunk) |
|
234 |
try: |
|
235 |
last_modified = time.mktime(email.utils.parsedate(download.headers['last-modified'])) |
|
236 |
os.utime(file_name, (time.time(), last_modified)) |
|
237 |
except (KeyError, TypeError, OverflowError, ValueError): |
|
238 |
# KeyError will be raised if Last-Modified header is missing |
|
239 |
# TypeError will be raised if header did not contain a valid date |
|
240 |
# OverflowError/ValueError may come from mktime invocation |
|
241 |
pass |
|
242 |
|
|
243 |
if last_modified is None: |
|
244 |
last_modified = os.path.getmtime(file_name) |
|
245 |
|
|
246 |
if opts.force or str(last_modified) != this_table.last_modified(): |
|
247 |
if "archive" in source and source["archive"]["format"] == "zip": |
|
248 |
logging.info("Extracting files from archive {}".format(file_name)) |
|
249 |
zip = zipfile.ZipFile(file_name, "r") |
|
250 |
for member in source["archive"]["files"]: |
|
251 |
zip.extract(member, workingdir) |
|
252 |
|
|
253 |
ogrpg = "PG:dbname={}".format(database) |
|
254 |
|
|
255 |
if port is not None: |
|
256 |
ogrpg = ogrpg + " port={}".format(port) |
|
257 |
if user is not None: |
|
258 |
ogrpg = ogrpg + " user={}".format(user) |
|
259 |
if host is not None: |
|
260 |
ogrpg = ogrpg + " host={}".format(host) |
|
261 |
|
|
262 |
ogrcommand = ["ogr2ogr", |
|
263 |
'-f', 'PostgreSQL', |
|
264 |
'-lco', 'GEOMETRY_NAME=way', |
|
265 |
'-lco', 'SPATIAL_INDEX=FALSE', |
|
266 |
'-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES', |
|
267 |
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)] |
|
268 |
|
|
269 |
if "ogropts" in source: |
|
270 |
ogrcommand += source["ogropts"] |
|
271 |
|
|
272 |
ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])] |
|
273 |
|
|
274 |
logging.info("Importing data from file {}".format(source["file"])) |
|
275 |
logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand))) |
|
276 |
|
|
277 |
# ogr2ogr can raise errors here, so they need to be caught |
|
278 |
try: |
|
279 |
subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True) |
|
280 |
except subprocess.CalledProcessError as e: |
|
281 |
# Add more detail on stdout for the logs |
|
282 |
logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name)) |
|
283 |
logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd))) |
|
284 |
logging.critical("stdout:\n{}".format(e.output)) |
|
285 |
logging.critical("stderr:\n{}".format(e.stderr)) |
|
286 |
raise RuntimeError("ogr2ogr error when loading table {}".format(name)) |
|
287 |
|
|
288 |
this_table.index() |
|
289 |
this_table.replace(str(last_modified)) |
|
290 |
else: |
|
291 |
logging.info("Table {} is up to date".format(name)) |
|
292 |
|
|
293 |
|
|
294 |
if __name__ == '__main__': |
|
295 |
main() |