#!/usr/bin/env python3
|
'''This script is designed to load quasi-static data into a PostGIS database
|
for rendering maps. It differs from the usual scripts to do this in that it is
|
designed to take its configuration from a file rather than be a series of shell
|
commands.
|
|
Some implicit assumptions are
|
- Time spent querying (rendering) the data is more valuable than the one-time
|
cost of loading it
|
- The script will not be running multiple times in parallel. This is not
|
normally likely because the script is likely to be called daily or less,
|
not minutely.
|
- Usage patterns will be similar to typical map rendering
|
'''
|
|
import yaml
|
import os
|
import os.path
|
import re
|
import argparse
|
import shutil
|
|
# modules for getting data
|
import zipfile
|
import requests
|
import io
|
import time
|
import email.utils
|
|
# modules for converting and postgres loading
|
import subprocess
|
import psycopg2
|
|
import logging
|
|
|
def database_setup(conn, temp_schema, schema, metadata_table):
|
with conn.cursor() as cur:
|
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
|
.format(temp_schema=temp_schema))
|
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
|
''' (name text primary key, last_modified text);''')
|
.format(schema=schema, metadata_table=metadata_table))
|
conn.commit()
|
|
|
class Table:
|
def __init__(self, name, conn, temp_schema, schema, metadata_table):
|
self._name = name
|
self._conn = conn
|
self._temp_schema = temp_schema
|
self._dst_schema = schema
|
self._metadata_table = metadata_table
|
|
# Clean up the temporary schema in preperation for loading
|
def clean_temp(self):
|
with self._conn.cursor() as cur:
|
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
self._conn.commit()
|
|
# get the last modified date from the metadata table
|
def last_modified(self):
|
with self._conn.cursor() as cur:
|
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
|
results = cur.fetchone()
|
if results is not None:
|
return results[0]
|
return ''
|
|
def index(self):
|
with self._conn.cursor() as cur:
|
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
# ogr creates a ogc_fid column we don't need
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
# Null geometries are useless for rendering
|
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
# sorting static tables helps performance and reduces size from the column drop above
|
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
|
'''(ST_Envelope(way));'''
|
'''CLUSTER "{temp_schema}"."{name}" '''
|
'''USING "{name}_order";'''
|
'''DROP INDEX "{temp_schema}"."{name}_order";'''
|
'''CREATE INDEX ON "{temp_schema}"."{name}" '''
|
'''USING GIST (way) WITH (fillfactor=100);''')
|
.format(name=self._name, temp_schema=self._temp_schema))
|
# Reset autovacuum. The table is static, so this doesn't really
|
# matter since it'll never need a vacuum.
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
self._conn.commit()
|
|
# VACUUM can't be run in transaction, so autocommit needs to be turned on
|
old_autocommit = self._conn.autocommit
|
try:
|
self._conn.autocommit = True
|
with self._conn.cursor() as cur:
|
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
|
.format(name=self._name, temp_schema=self._temp_schema))
|
finally:
|
self._conn.autocommit = old_autocommit
|
|
def replace(self, new_last_modified):
|
with self._conn.cursor() as cur:
|
cur.execute('''BEGIN;''')
|
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
|
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
|
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
|
|
# We checked if the metadata table had this table way up above
|
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
[self._name])
|
if cur.rowcount == 0:
|
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
|
'''(name, last_modified) VALUES (%s, %s)''')
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
[self._name, new_last_modified])
|
else:
|
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
[new_last_modified, self._name])
|
self._conn.commit()
|
|
|
def should_redownload(req, url, file_name):
|
if not os.path.isfile(file_name):
|
return True
|
download_head = req.head(url, allow_redirects=True)
|
if 'last-modified' in download_head.headers:
|
last_modified = time.mktime(email.utils.parsedate(download_head.headers['last-modified']))
|
if last_modified > os.path.getmtime(file_name):
|
logging.debug('Will redownload {} due to modification on server'.format(file_name))
|
return True
|
if 'content-length' in download_head.headers:
|
file_size = int(download_head.headers['content-length'])
|
if file_size != os.path.getsize(file_name):
|
logging.debug('Will redownload {} due to size mismatch'.format(file_name))
|
return True
|
else:
|
return False
|
logging.debug('Will redownload {} due to missing Content-Length header'.format(file_name))
|
return True
|
|
|
def main():
|
# parse options
|
parser = argparse.ArgumentParser(description="Load external data into a database")
|
|
parser.add_argument("-r", "--redownload", action="store_true", help="Redownload external files, even if not required")
|
parser.add_argument("-f", "--force", action="store_true", help="Recreate database objects, even if not required")
|
|
parser.add_argument("-c", "--config", action="store", default="external-data.yml",
|
help="Name of configuration file (default external-data.yml)")
|
parser.add_argument("-D", "--data", action="store", help="Override data download directory")
|
|
parser.add_argument("-d", "--database", action="store", help="Override database name to connect to")
|
parser.add_argument("-H", "--host", action="store",
|
help="Override database server host or socket directory")
|
parser.add_argument("-p", "--port", action="store", help="Override database server port")
|
parser.add_argument("-U", "--username", action="store", help="Override database user name")
|
parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q")
|
parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems")
|
|
opts = parser.parse_args()
|
|
if opts.verbose:
|
logging.basicConfig(level=logging.DEBUG)
|
elif opts.quiet:
|
logging.basicConfig(level=logging.WARNING)
|
else:
|
logging.basicConfig(level=logging.INFO)
|
|
with open(opts.config) as config_file:
|
config = yaml.safe_load(config_file)
|
data_dir = opts.data or config["settings"]["data_dir"]
|
os.makedirs(data_dir, exist_ok=True)
|
|
# If the DB options are unspecified in both on the command line and in the
|
# config file, libpq will pick what to use with the None
|
database = opts.database or config["settings"].get("database")
|
host = opts.host or config["settings"].get("host")
|
port = opts.port or config["settings"].get("port")
|
user = opts.username or config["settings"].get("username")
|
with requests.Session() as s, \
|
psycopg2.connect(database=database,
|
host=host, port=port,
|
user=user) as conn:
|
|
s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
|
|
# DB setup
|
database_setup(conn, config["settings"]["temp_schema"],
|
config["settings"]["schema"],
|
config["settings"]["metadata_table"])
|
|
for name, source in config["sources"].items():
|
logging.info("Checking table {}".format(name))
|
# Don't attempt to handle strange names
|
# Even if there was code to escape them properly here, you don't want
|
# in a style with all the quoting headaches
|
if not re.match('''^[a-zA-Z0-9_]+$''', name):
|
raise RuntimeError("Only ASCII alphanumeric table are names supported")
|
|
workingdir = os.path.join(data_dir, name)
|
# Clean up anything left over from an aborted run
|
shutil.rmtree(workingdir, ignore_errors=True)
|
|
os.makedirs(workingdir, exist_ok=True)
|
|
this_table = Table(name, conn,
|
config["settings"]["temp_schema"],
|
config["settings"]["schema"],
|
config["settings"]["metadata_table"])
|
this_table.clean_temp()
|
|
file_name = os.path.join(data_dir, os.path.basename(source["url"]))
|
last_modified = None
|
if opts.redownload or should_redownload(s, source["url"], file_name):
|
logging.info("Downloading file {}".format(file_name))
|
download = s.get(source["url"], stream=True)
|
download.raise_for_status()
|
with open(file_name, "wb") as f:
|
for chunk in download.iter_content(chunk_size=8388608):
|
f.write(chunk)
|
try:
|
last_modified = time.mktime(email.utils.parsedate(download.headers['last-modified']))
|
os.utime(file_name, (time.time(), last_modified))
|
except (KeyError, TypeError, OverflowError, ValueError):
|
# KeyError will be raised if Last-Modified header is missing
|
# TypeError will be raised if header did not contain a valid date
|
# OverflowError/ValueError may come from mktime invocation
|
pass
|
|
if last_modified is None:
|
last_modified = os.path.getmtime(file_name)
|
|
if opts.force or str(last_modified) != this_table.last_modified():
|
if "archive" in source and source["archive"]["format"] == "zip":
|
logging.info("Extracting files from archive {}".format(file_name))
|
zip = zipfile.ZipFile(file_name, "r")
|
for member in source["archive"]["files"]:
|
zip.extract(member, workingdir)
|
|
ogrpg = "PG:dbname={}".format(database)
|
|
if port is not None:
|
ogrpg = ogrpg + " port={}".format(port)
|
if user is not None:
|
ogrpg = ogrpg + " user={}".format(user)
|
if host is not None:
|
ogrpg = ogrpg + " host={}".format(host)
|
|
ogrcommand = ["ogr2ogr",
|
'-f', 'PostgreSQL',
|
'-lco', 'GEOMETRY_NAME=way',
|
'-lco', 'SPATIAL_INDEX=FALSE',
|
'-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
|
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
|
|
if "ogropts" in source:
|
ogrcommand += source["ogropts"]
|
|
ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])]
|
|
logging.info("Importing data from file {}".format(source["file"]))
|
logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand)))
|
|
# ogr2ogr can raise errors here, so they need to be caught
|
try:
|
subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
|
except subprocess.CalledProcessError as e:
|
# Add more detail on stdout for the logs
|
logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name))
|
logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd)))
|
logging.critical("stdout:\n{}".format(e.output))
|
logging.critical("stderr:\n{}".format(e.stderr))
|
raise RuntimeError("ogr2ogr error when loading table {}".format(name))
|
|
this_table.index()
|
this_table.replace(str(last_modified))
|
else:
|
logging.info("Table {} is up to date".format(name))
|
|
|
if __name__ == '__main__':
|
main()
|