#!/usr/bin/env python3
'''This script is designed to load quasi-static data into a PostGIS database
for rendering maps. It differs from the usual scripts to do this in that it is
designed to take its configuration from a file rather than be a series of shell
Some implicit assumptions are
- Time spent querying (rendering) the data is more valuable than the one-time
cost of loading it
- The script will not be running multiple times in parallel. This is not
normally likely because the script is likely to be called daily or less,
not minutely.
- Usage patterns will be similar to typical map rendering
import yaml
import os
import os.path
import re
import argparse
import shutil
# modules for getting data
import zipfile
import requests
import io
import time
import email.utils
# modules for converting and postgres loading
import subprocess
import psycopg2
import logging
def database_setup(conn, temp_schema, schema, metadata_table):
with conn.cursor() as cur:
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
''' (name text primary key, last_modified text);''')
.format(schema=schema, metadata_table=metadata_table))
class Table:
def __init__(self, name, conn, temp_schema, schema, metadata_table):
self._name = name
self._conn = conn
self._temp_schema = temp_schema
self._dst_schema = schema
self._metadata_table = metadata_table
# Clean up the temporary schema in preperation for loading
def clean_temp(self):
with self._conn.cursor() as cur:
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
.format(name=self._name, temp_schema=self._temp_schema))
# get the last modified date from the metadata table
def last_modified(self):
with self._conn.cursor() as cur:
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
results = cur.fetchone()
if results is not None:
return results[0]
return ''
def index(self):
with self._conn.cursor() as cur:
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
.format(name=self._name, temp_schema=self._temp_schema))
# ogr creates a ogc_fid column we don't need
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
.format(name=self._name, temp_schema=self._temp_schema))
# Null geometries are useless for rendering
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
# sorting static tables helps performance and reduces size from the column drop above
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
'''CLUSTER "{temp_schema}"."{name}" '''
'''USING "{name}_order";'''
'''DROP INDEX "{temp_schema}"."{name}_order";'''
'''CREATE INDEX ON "{temp_schema}"."{name}" '''
'''USING GIST (way) WITH (fillfactor=100);''')
.format(name=self._name, temp_schema=self._temp_schema))
# Reset autovacuum. The table is static, so this doesn't really
# matter since it'll never need a vacuum.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
.format(name=self._name, temp_schema=self._temp_schema))
# VACUUM can't be run in transaction, so autocommit needs to be turned on
old_autocommit = self._conn.autocommit
self._conn.autocommit = True
with self._conn.cursor() as cur:
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
.format(name=self._name, temp_schema=self._temp_schema))
self._conn.autocommit = old_autocommit
def replace(self, new_last_modified):
with self._conn.cursor() as cur:
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
# We checked if the metadata table had this table way up above
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
if cur.rowcount == 0:
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
'''(name, last_modified) VALUES (%s, %s)''')
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[self._name, new_last_modified])
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[new_last_modified, self._name])
def should_redownload(req, url, file_name):
if not os.path.isfile(file_name):
return True
download_head = req.head(url, allow_redirects=True)
if 'last-modified' in download_head.headers:
last_modified = time.mktime(email.utils.parsedate(download_head.headers['last-modified']))
if last_modified > os.path.getmtime(file_name):
logging.debug('Will redownload {} due to modification on server'.format(file_name))
return True
if 'content-length' in download_head.headers:
file_size = int(download_head.headers['content-length'])
if file_size != os.path.getsize(file_name):
logging.debug('Will redownload {} due to size mismatch'.format(file_name))
return True
return False
logging.debug('Will redownload {} due to missing Content-Length header'.format(file_name))
return True
def main():
# parse options
parser = argparse.ArgumentParser(description="Load external data into a database")
parser.add_argument("-r", "--redownload", action="store_true", help="Redownload external files, even if not required")
parser.add_argument("-f", "--force", action="store_true", help="Recreate database objects, even if not required")
parser.add_argument("-c", "--config", action="store", default="external-data.yml",
help="Name of configuration file (default external-data.yml)")
parser.add_argument("-D", "--data", action="store", help="Override data download directory")
parser.add_argument("-d", "--database", action="store", help="Override database name to connect to")
parser.add_argument("-H", "--host", action="store",
help="Override database server host or socket directory")
parser.add_argument("-p", "--port", action="store", help="Override database server port")
parser.add_argument("-U", "--username", action="store", help="Override database user name")
parser.add_argument("-v", "--verbose", action="store_true", help="Be more verbose. Overrides -q")
parser.add_argument("-q", "--quiet", action="store_true", help="Only report serious problems")
opts = parser.parse_args()
if opts.verbose:
elif opts.quiet:
with open(opts.config) as config_file:
config = yaml.safe_load(config_file)
data_dir = opts.data or config["settings"]["data_dir"]
os.makedirs(data_dir, exist_ok=True)
# If the DB options are unspecified in both on the command line and in the
# config file, libpq will pick what to use with the None
database = opts.database or config["settings"].get("database")
host = opts.host or config["settings"].get("host")
port = opts.port or config["settings"].get("port")
user = opts.username or config["settings"].get("username")
with requests.Session() as s, \
host=host, port=port,
user=user) as conn:
s.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
# DB setup
database_setup(conn, config["settings"]["temp_schema"],
for name, source in config["sources"].items():
logging.info("Checking table {}".format(name))
# Don't attempt to handle strange names
# Even if there was code to escape them properly here, you don't want
# in a style with all the quoting headaches
if not re.match('''^[a-zA-Z0-9_]+$''', name):
raise RuntimeError("Only ASCII alphanumeric table are names supported")
workingdir = os.path.join(data_dir, name)
# Clean up anything left over from an aborted run
shutil.rmtree(workingdir, ignore_errors=True)
os.makedirs(workingdir, exist_ok=True)
this_table = Table(name, conn,
file_name = os.path.join(data_dir, os.path.basename(source["url"]))
last_modified = None
if opts.redownload or should_redownload(s, source["url"], file_name):
logging.info("Downloading file {}".format(file_name))
download = s.get(source["url"], stream=True)
with open(file_name, "wb") as f:
for chunk in download.iter_content(chunk_size=8388608):
last_modified = time.mktime(email.utils.parsedate(download.headers['last-modified']))
os.utime(file_name, (time.time(), last_modified))
except (KeyError, TypeError, OverflowError, ValueError):
# KeyError will be raised if Last-Modified header is missing
# TypeError will be raised if header did not contain a valid date
# OverflowError/ValueError may come from mktime invocation
if last_modified is None:
last_modified = os.path.getmtime(file_name)
if opts.force or str(last_modified) != this_table.last_modified():
if "archive" in source and source["archive"]["format"] == "zip":
logging.info("Extracting files from archive {}".format(file_name))
zip = zipfile.ZipFile(file_name, "r")
for member in source["archive"]["files"]:
zip.extract(member, workingdir)
ogrpg = "PG:dbname={}".format(database)
if port is not None:
ogrpg = ogrpg + " port={}".format(port)
if user is not None:
ogrpg = ogrpg + " user={}".format(user)
if host is not None:
ogrpg = ogrpg + " host={}".format(host)
ogrcommand = ["ogr2ogr",
'-f', 'PostgreSQL',
'-lco', 'GEOMETRY_NAME=way',
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
if "ogropts" in source:
ogrcommand += source["ogropts"]
ogrcommand += [ogrpg, os.path.join(workingdir, source["file"])]
logging.info("Importing data from file {}".format(source["file"]))
logging.debug("running {}".format(subprocess.list2cmdline(ogrcommand)))
# ogr2ogr can raise errors here, so they need to be caught
subprocess.check_output(ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError as e:
# Add more detail on stdout for the logs
logging.critical("ogr2ogr returned {} with layer {}".format(e.returncode, name))
logging.critical("Command line was {}".format(subprocess.list2cmdline(e.cmd)))
raise RuntimeError("ogr2ogr error when loading table {}".format(name))
logging.info("Table {} is up to date".format(name))
if __name__ == '__main__':