Old tiles.ttss.pl backed using MapProxy
Jacek Kowalski
2020-06-09 a32ed5ec142a74385fe24f9e1adecfc86847f543
commit | author | age
d1e1f5 1 #!/usr/bin/env python3
JK 2 '''This script is designed to load quasi-static data into a PostGIS database
3 for rendering maps. It differs from the usual scripts to do this in that it is
4 designed to take its configuration from a file rather than be a series of shell
5 commands.
6
7 Some implicit assumptions are
8 - Time spent querying (rendering) the data is more valuable than the one-time
9     cost of loading it
10 - The script will not be running multiple times in parallel. This is not
11     normally likely because the script is likely to be called daily or less,
12     not minutely.
13 - Usage patterns will be similar to typical map rendering
14 '''
15
16 import yaml
17 import os
18 import os.path
19 import re
20 import argparse
21 import shutil
22
23 # modules for getting data
24 import zipfile
25 import requests
26 import io
27 import time
28 import email.utils
29
30 # modules for converting and postgres loading
31 import subprocess
32 import psycopg2
33
34 import logging
35
36
37 def database_setup(conn, temp_schema, schema, metadata_table):
38     with conn.cursor() as cur:
39             cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
40                         .format(temp_schema=temp_schema))
41             cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
42                          ''' (name text primary key, last_modified text);''')
43                         .format(schema=schema, metadata_table=metadata_table))
44     conn.commit()
45
46
47 class Table:
48     def __init__(self, name, conn, temp_schema, schema, metadata_table):
49         self._name = name
50         self._conn = conn
51         self._temp_schema = temp_schema
52         self._dst_schema = schema
53         self._metadata_table = metadata_table
54
55     # Clean up the temporary schema in preperation for loading
56     def clean_temp(self):
57         with self._conn.cursor() as cur:
58             cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
59                         .format(name=self._name, temp_schema=self._temp_schema))
60         self._conn.commit()
61
62     # get the last modified date from the metadata table
63     def last_modified(self):
64         with self._conn.cursor() as cur:
65             cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
66                         .format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
67             results = cur.fetchone()
68             if results is not None:
69                 return results[0]
70         return ''
71
72     def index(self):
73         with self._conn.cursor() as cur:
74             # Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
75             cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
76                         .format(name=self._name, temp_schema=self._temp_schema))
77             # ogr creates a ogc_fid column we don't need
78             cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
79                         .format(name=self._name, temp_schema=self._temp_schema))
80
81             # Null geometries are useless for rendering
82             cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
83                         .format(name=self._name, temp_schema=self._temp_schema))
84             cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
85                         .format(name=self._name, temp_schema=self._temp_schema))
86             # sorting static tables helps performance and reduces size from the column drop above
87             cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
88                          '''(ST_Envelope(way));'''
89                          '''CLUSTER "{temp_schema}"."{name}" '''
90                          '''USING "{name}_order";'''
91                          '''DROP INDEX "{temp_schema}"."{name}_order";'''
92                          '''CREATE INDEX ON "{temp_schema}"."{name}" '''
93                          '''USING GIST (way) WITH (fillfactor=100);''')
94                         .format(name=self._name, temp_schema=self._temp_schema))
95             # Reset autovacuum. The table is static, so this doesn't really
96             # matter since it'll never need a vacuum.
97             cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
98                         .format(name=self._name, temp_schema=self._temp_schema))
99         self._conn.commit()
100
101         # VACUUM can't be run in transaction, so autocommit needs to be turned on
102         old_autocommit = self._conn.autocommit
103         try:
104             self._conn.autocommit = True
105             with self._conn.cursor() as cur:
106                 cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
107                             .format(name=self._name, temp_schema=self._temp_schema))
108         finally:
109             self._conn.autocommit = old_autocommit
110
111     def replace(self, new_last_modified):
112         with self._conn.cursor() as cur:
113             cur.execute('''BEGIN;''')
114             cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
115                          '''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
116                         .format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
117
118             # We checked if the metadata table had this table way up above
119             cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
120                         .format(schema=self._dst_schema, metadata_table=self._metadata_table),
121                         [self._name])
122             if cur.rowcount == 0:
123                 cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
124                             '''(name, last_modified) VALUES (%s, %s)''')
125                             .format(schema=self._dst_schema, metadata_table=self._metadata_table),
126                             [self._name, new_last_modified])
127             else:
128                 cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
129                             .format(schema=self._dst_schema, metadata_table=self._metadata_table),
130                             [new_last_modified, self._name])
131         self._conn.commit()
132
133
134 def should_redownload(req, url, file_name):
135     if not os.path.isfile(file_name):
136         return True
137     download_head = req.head(url, allow_redirects=True)
138     if 'last-modified' in download_head.headers:
139         last_modified = time.mktime(email.utils.parsedate(download_head.headers['last-modified']))
140         if last_modified > os.path.getmtime(file_name):
141             logging.debug('Will redownload {} due to modification on server'.format(file_name))
142             return True
143     if 'content-length' in download_head.headers:
144         file_size = int(download_head.headers['content-length'])
145         if file_size != os.path.getsize(file_name):
146             logging.debug('Will redownload {} due to size mismatch'.format(file_name))
147             return True
148         else:
149             return False
150     logging.debug('Will redownload {} due to missing Content-Length header'.format(file_name))
151     return True
152
153
154 def main():
155     # parse options
156     parser = argparse.ArgumentParser(description="Load external data into a database")
157
158     parser.add_argument("-r", "--redownload", action="store_true", help="Redownload external files, even if not required")
159     parser.add_argument("-f", "--force", action="store_true", help="Recreate database objects, even if not required")
160
161     parser.add_argument("-c", "--config", action="store", default="external-data.yml",
162                         help="Name of configuration file (default external-data.yml)")
163     parser.add_argument("-D", "--data", action="store", help="Override data download directory")
164
165     parser.add_argument("-d", "--database", action="store", help="Override database name to connect to")
166     parser.add_argument("-H", "--host", action="store",
167                         help="Override database server host or socket directory")
168     parser.add_argument("-p", "--port", action="store", help="Override database server port")
169     parser.add_argument("-U", "--username", action="store", help="Override database user name")
170     parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q")
171     parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems")
172
173     opts = parser.parse_args()
174
175     if opts.verbose:
176         logging.basicConfig(level=logging.DEBUG)
177     elif opts.quiet:
178         logging.basicConfig(level=logging.WARNING)
179     else:
180         logging.basicConfig(level=logging.INFO)
181
182     with open(opts.config) as config_file:
183         config = yaml.safe_load(config_file)
184         data_dir = opts.data or config["settings"]["data_dir"]
185         os.makedirs(data_dir, exist_ok=True)
186
187         # If the DB options are unspecified in both on the command line and in the
188         # config file, libpq will pick what to use with the None
189         database = opts.database or config["settings"].get("database")
190         host = opts.host or config["settings"].get("host")
191         port = opts.port or config["settings"].get("port")
192         user = opts.username or config["settings"].get("username")
193         with requests.Session() as s, \
194             psycopg2.connect(database=database,
195                              host=host, port=port,
196                              user=user) as conn:
197
198             s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
199
200             # DB setup
201             database_setup(conn, config["settings"]["temp_schema"],
202                            config["settings"]["schema"],
203                            config["settings"]["metadata_table"])
204
205             for name, source in config["sources"].items():
206                 logging.info("Checking table {}".format(name))
207                 # Don't attempt to handle strange names
208                 # Even if there was code to escape them properly here, you don't want
209                 # in a style with all the quoting headaches
210                 if not re.match('''^[a-zA-Z0-9_]+$''', name):
211                     raise RuntimeError("Only ASCII alphanumeric table are names supported")
212
213                 workingdir = os.path.join(data_dir, name)
214                 # Clean up anything left over from an aborted run
215                 shutil.rmtree(workingdir, ignore_errors=True)
216
217                 os.makedirs(workingdir, exist_ok=True)
218
219                 this_table = Table(name, conn,
220                                    config["settings"]["temp_schema"],
221                                    config["settings"]["schema"],
222                                    config["settings"]["metadata_table"])
223                 this_table.clean_temp()
224
225                 file_name = os.path.join(data_dir, os.path.basename(source["url"]))
226                 last_modified = None
227                 if opts.redownload or should_redownload(s, source["url"], file_name):
228                     logging.info("Downloading file {}".format(file_name))
229                     download = s.get(source["url"], stream=True)
230                     download.raise_for_status()
231                     with open(file_name, "wb") as f:
232                         for chunk in download.iter_content(chunk_size=8388608):
233                             f.write(chunk)
234                     try:
235                         last_modified = time.mktime(email.utils.parsedate(download.headers['last-modified']))
236                         os.utime(file_name, (time.time(), last_modified))
237                     except (KeyError, TypeError, OverflowError, ValueError):
238                         # KeyError will be raised if Last-Modified header is missing
239                         # TypeError will be raised if header did not contain a valid date
240                         # OverflowError/ValueError may come from mktime invocation
241                         pass
242
243                 if last_modified is None:
244                     last_modified = os.path.getmtime(file_name)
245
246                 if opts.force or str(last_modified) != this_table.last_modified():
247                     if "archive" in source and source["archive"]["format"] == "zip":
248                         logging.info("Extracting files from archive {}".format(file_name))
249                         zip = zipfile.ZipFile(file_name, "r")
250                         for member in source["archive"]["files"]:
251                             zip.extract(member, workingdir)
252
253                     ogrpg = "PG:dbname={}".format(database)
254
255                     if port is not None:
256                         ogrpg = ogrpg + " port={}".format(port)
257                     if user is not None:
258                         ogrpg = ogrpg + " user={}".format(user)
259                     if host is not None:
260                         ogrpg = ogrpg + " host={}".format(host)
261
262                     ogrcommand = ["ogr2ogr",
263                                   '-f', 'PostgreSQL',
264                                   '-lco', 'GEOMETRY_NAME=way',
265                                   '-lco', 'SPATIAL_INDEX=FALSE',
266                                   '-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
267                                   '-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
268
269                     if "ogropts" in source:
270                         ogrcommand += source["ogropts"]
271
272                     ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])]
273
274                     logging.info("Importing data from file {}".format(source["file"]))
275                     logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand)))
276
277                     # ogr2ogr can raise errors here, so they need to be caught
278                     try:
279                         subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
280                     except subprocess.CalledProcessError as e:
281                         # Add more detail on stdout for the logs
282                         logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name))
283                         logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd)))
284                         logging.critical("stdout:\n{}".format(e.output))
285                         logging.critical("stderr:\n{}".format(e.stderr))
286                         raise RuntimeError("ogr2ogr error when loading table {}".format(name))
287
288                     this_table.index()
289                     this_table.replace(str(last_modified))
290                 else:
291                     logging.info("Table {} is up to date".format(name))
292
293
294 if __name__ == '__main__':
295     main()