From 04509b48dbd3fc1c52183063dd69a1b5364b1ffb Mon Sep 17 00:00:00 2001 From: Maksim Andrianov Date: Fri, 29 May 2020 13:19:47 +0300 Subject: [PATCH] [generator][python] Added airmaps. --- tools/python/airmaps/README.md | 17 + tools/python/airmaps/__init__.py | 12 + tools/python/airmaps/dags/build_coastline.py | 93 ++ tools/python/airmaps/dags/build_maps.py | 155 ++++ tools/python/airmaps/dags/update_planet.py | 83 ++ tools/python/airmaps/instruments/settings.py | 62 ++ tools/python/airmaps/instruments/storage.py | 27 + tools/python/airmaps/instruments/utils.py | 48 + tools/python/airmaps/requirements.txt | 5 + tools/python/airmaps/sandbox/.dockerignore | 4 + tools/python/airmaps/sandbox/README.md | 76 ++ .../python/airmaps/sandbox/airmaps/Dockerfile | 29 + .../airmaps/sandbox/airmaps/airflow.cfg | 856 ++++++++++++++++++ .../airmaps/sandbox/airmaps/airmaps.ini | 82 ++ .../sandbox/airmaps/run_airmaps_service.sh | 13 + tools/python/airmaps/sandbox/build.sh | 14 + tools/python/airmaps/sandbox/clean.sh | 10 + .../python/airmaps/sandbox/create_storage.sh | 10 + .../python/airmaps/sandbox/docker-compose.yml | 41 + .../airmaps/var/etc/airmaps.ini.default | 82 ++ .../descriptions/descriptions_downloader.py | 7 +- tools/python/maps_generator/generator/env.py | 26 +- .../maps_generator/generator/generation.py | 84 +- .../maps_generator/generator/settings.py | 16 +- .../python/maps_generator/generator/stages.py | 73 +- .../generator/stages_declaration.py | 10 +- .../python/maps_generator/generator/status.py | 27 +- .../python/maps_generator/generator/steps.py | 9 +- tools/python/maps_generator/maps_generator.py | 19 +- tools/python/maps_generator/utils/file.py | 10 +- 30 files changed, 1898 insertions(+), 102 deletions(-) create mode 100644 tools/python/airmaps/README.md create mode 100644 tools/python/airmaps/__init__.py create mode 100644 tools/python/airmaps/dags/build_coastline.py create mode 100644 tools/python/airmaps/dags/build_maps.py create mode 100644 tools/python/airmaps/dags/update_planet.py create mode 100644 tools/python/airmaps/instruments/settings.py create mode 100644 tools/python/airmaps/instruments/storage.py create mode 100644 tools/python/airmaps/instruments/utils.py create mode 100644 tools/python/airmaps/requirements.txt create mode 100644 tools/python/airmaps/sandbox/.dockerignore create mode 100644 tools/python/airmaps/sandbox/README.md create mode 100644 tools/python/airmaps/sandbox/airmaps/Dockerfile create mode 100644 tools/python/airmaps/sandbox/airmaps/airflow.cfg create mode 100644 tools/python/airmaps/sandbox/airmaps/airmaps.ini create mode 100755 tools/python/airmaps/sandbox/airmaps/run_airmaps_service.sh create mode 100755 tools/python/airmaps/sandbox/build.sh create mode 100755 tools/python/airmaps/sandbox/clean.sh create mode 100755 tools/python/airmaps/sandbox/create_storage.sh create mode 100644 tools/python/airmaps/sandbox/docker-compose.yml create mode 100644 tools/python/airmaps/var/etc/airmaps.ini.default diff --git a/tools/python/airmaps/README.md b/tools/python/airmaps/README.md new file mode 100644 index 0000000000..8e9ce6ae25 --- /dev/null +++ b/tools/python/airmaps/README.md @@ -0,0 +1,17 @@ +# airmaps - building of maps using airflow. + + +## Storage +Repository of result and temporary files. +Currently, the storage is a webdav server. + + +## Description of DAGs: +1. Update_planet - updates .o5m planet file. + +2. Build_coastline - builds coastline files. + +3. Generate_open_source_maps - builds free maps for maps.me + + +All results will be published on the storage. diff --git a/tools/python/airmaps/__init__.py b/tools/python/airmaps/__init__.py new file mode 100644 index 0000000000..9bdc45cf6d --- /dev/null +++ b/tools/python/airmaps/__init__.py @@ -0,0 +1,12 @@ +import os + +from airmaps.instruments import settings + +CONFIG_PATH = os.path.join( + os.path.dirname(os.path.join(os.path.realpath(__file__))), + "var", + "etc", + "airmaps.ini", +) + +settings.init(CONFIG_PATH) diff --git a/tools/python/airmaps/dags/build_coastline.py b/tools/python/airmaps/dags/build_coastline.py new file mode 100644 index 0000000000..d2daebfd0e --- /dev/null +++ b/tools/python/airmaps/dags/build_coastline.py @@ -0,0 +1,93 @@ +import logging +import os +import shutil +from datetime import timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +from airmaps.instruments import settings +from airmaps.instruments import storage +from airmaps.instruments.utils import get_latest_filename +from airmaps.instruments.utils import make_rm_build_task +from airmaps.instruments.utils import put_current_date_in_filename +from airmaps.instruments.utils import rm_build +from maps_generator.generator import stages_declaration as sd +from maps_generator.generator.env import Env +from maps_generator.generator.env import WORLD_COASTS_NAME +from maps_generator.maps_generator import run_generation + +logger = logging.getLogger("airmaps") + + +DAG = DAG( + "Build_coastline", + schedule_interval=timedelta(days=1), + default_args={ + "owner": "MAPS.ME", + "depends_on_past": True, + "start_date": days_ago(0), + "email": settings.EMAILS, + "email_on_failure": True, + "email_on_retry": False, + "retries": 0, + "retry_delay": timedelta(minutes=5), + "priority_weight": 1, + }, +) + +COASTLINE_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/coasts" + + +def publish_coastline(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + env = Env(build_name=build_name) + for name in (f"{WORLD_COASTS_NAME}.geom", f"{WORLD_COASTS_NAME}.rawgeom"): + coastline = put_current_date_in_filename(name) + latest = get_latest_filename(name) + coastline_full = os.path.join(env.paths.coastline_path, coastline) + latest_full = os.path.join(env.paths.coastline_path, latest) + shutil.move(os.path.join(env.paths.coastline_path, name), coastline_full) + os.symlink(coastline, latest_full) + + storage.wd_publish(coastline_full, f"{COASTLINE_STORAGE_PATH}/{coastline}") + storage.wd_publish(latest_full, f"{COASTLINE_STORAGE_PATH}/{latest}") + + +def build_coastline(**kwargs): + env = Env() + kwargs["ti"].xcom_push(key="build_name", value=env.build_name) + + run_generation( + env, + ( + sd.StageDownloadAndConvertPlanet(), + sd.StageCoastline(use_old_if_fail=False), + sd.StageCleanup(), + ), + ) + env.finish() + + +BUILD_COASTLINE_TASK = PythonOperator( + task_id="Build_coastline_task", + provide_context=True, + python_callable=build_coastline, + on_failure_callback=lambda ctx: rm_build(**ctx), + dag=DAG, +) + + +PUBLISH_COASTLINE_TASK = PythonOperator( + task_id="Publish_coastline_task", + provide_context=True, + python_callable=publish_coastline, + dag=DAG, +) + + +RM_BUILD_TASK = make_rm_build_task(DAG) + + +BUILD_COASTLINE_TASK >> PUBLISH_COASTLINE_TASK >> RM_BUILD_TASK diff --git a/tools/python/airmaps/dags/build_maps.py b/tools/python/airmaps/dags/build_maps.py new file mode 100644 index 0000000000..38a08167ed --- /dev/null +++ b/tools/python/airmaps/dags/build_maps.py @@ -0,0 +1,155 @@ +import logging +from datetime import timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +from airmaps.instruments import settings +from airmaps.instruments import storage +from airmaps.instruments.utils import make_rm_build_task +from airmaps.instruments.utils import run_generation_from_first_stage +from maps_generator.generator import stages_declaration as sd +from maps_generator.generator.env import Env +from maps_generator.generator.env import PathProvider +from maps_generator.generator.env import get_all_countries_list +from maps_generator.maps_generator import run_generation + +logger = logging.getLogger("airmaps") + + +MAPS_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/maps" + + +class MapsGenerationDAG(DAG): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + build_prolog_task = PythonOperator( + task_id="Build_prolog_task", + provide_context=True, + python_callable=MapsGenerationDAG.build_prolog, + dag=self, + ) + + build_epilog_task = PythonOperator( + task_id="Build_epilog_task", + provide_context=True, + python_callable=MapsGenerationDAG.build_epilog, + dag=self, + ) + + publish_maps_task = PythonOperator( + task_id="Publish_maps_task", + provide_context=True, + python_callable=MapsGenerationDAG.publish_maps, + dag=self, + ) + + rm_build_task = make_rm_build_task(self) + + build_epilog_task >> publish_maps_task >> rm_build_task + for country in get_all_countries_list(PathProvider.borders_path()): + build_prolog_task >> self.make_mwm_operator(country) >> build_epilog_task + + @staticmethod + def get_params(namespace="env", **kwargs): + return kwargs.get("params", {}).get(namespace, {}) + + @staticmethod + def build_prolog(**kwargs): + params = MapsGenerationDAG.get_params(**kwargs) + env = Env(**params) + kwargs["ti"].xcom_push(key="build_name", value=env.build_name) + run_generation( + env, + ( + sd.StageDownloadAndConvertPlanet(), + sd.StageCoastline(), + sd.StagePreprocess(), + sd.StageFeatures(), + sd.StageDownloadDescriptions(), + ), + ) + + @staticmethod + def make_build_mwm_func(country): + def build_mwm(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + params = MapsGenerationDAG.get_params(**kwargs) + params.update({"build_name": build_name, "countries": [country,]}) + env = Env(**params) + # We need to check existing of mwm.tmp. It is needed if we want to + # build mwms from part of planet. + tmp_mwm_name = env.get_tmp_mwm_names() + assert len(tmp_mwm_name) <= 1 + if not tmp_mwm_name: + logger.warning(f"mwm.tmp does not exist for {country}.") + return + + run_generation_from_first_stage(env, (sd.StageMwm(),), build_lock=False) + + return build_mwm + + @staticmethod + def build_epilog(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + params = MapsGenerationDAG.get_params(**kwargs) + params.update({"build_name": build_name}) + env = Env(**params) + run_generation_from_first_stage( + env, + ( + sd.StageCountriesTxt(), + sd.StageExternalResources(), + sd.StageLocalAds(), + sd.StageStatistics(), + sd.StageCleanup(), + ), + ) + env.finish() + + @staticmethod + def publish_maps(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + params = MapsGenerationDAG.get_params(**kwargs) + params.update({"build_name": build_name}) + env = Env(**params) + subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)["subdir"] + storage_path = f"{MAPS_STORAGE_PATH}/{subdir}" + storage.wd_publish(env.paths.mwm_path, f"{storage_path}/{env.mwm_version}/") + + def make_mwm_operator(self, country): + normalized_name = "__".join(country.lower().split()) + return PythonOperator( + task_id=f"Build_country_{normalized_name}_task", + provide_context=True, + python_callable=MapsGenerationDAG.make_build_mwm_func(country), + dag=self, + ) + + +PARAMS = {"storage": {"subdir": "open_source"}} +if settings.DEBUG: + PARAMS["env"] = { + # The planet file in debug mode does not contain Russia_Moscow territory. + # It is needed for testing. + "countries": ["Cuba", "Haiti", "Jamaica", "Cayman Islands", "Russia_Moscow"] + } + +OPEN_SOURCE_MAPS_GENERATION_DAG = MapsGenerationDAG( + "Generate_open_source_maps", + schedule_interval=timedelta(days=7), + default_args={ + "owner": "MAPS.ME", + "depends_on_past": True, + "start_date": days_ago(0), + "email": settings.EMAILS, + "email_on_failure": True, + "email_on_retry": False, + "retries": 0, + "retry_delay": timedelta(minutes=5), + "priority_weight": 1, + "params": PARAMS, + }, +) diff --git a/tools/python/airmaps/dags/update_planet.py b/tools/python/airmaps/dags/update_planet.py new file mode 100644 index 0000000000..dcab15c9da --- /dev/null +++ b/tools/python/airmaps/dags/update_planet.py @@ -0,0 +1,83 @@ +import logging +from datetime import timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + +from airmaps.instruments import settings +from airmaps.instruments import storage +from airmaps.instruments.utils import make_rm_build_task +from maps_generator.generator import stages_declaration as sd +from maps_generator.generator.env import Env +from maps_generator.maps_generator import run_generation +from maps_generator.utils.md5 import md5_ext + +logger = logging.getLogger("airmaps") + + +DAG = DAG( + "Update_planet", + schedule_interval=timedelta(days=1), + default_args={ + "owner": "MAPS.ME", + "depends_on_past": True, + "start_date": days_ago(0), + "email": settings.EMAILS, + "email_on_failure": True, + "email_on_retry": False, + "retries": 0, + "retry_delay": timedelta(minutes=5), + "priority_weight": 1, + }, +) + + +PLANET_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/planet_regular/planet-latest.o5m" + + +def update_planet(**kwargs): + env = Env() + kwargs["ti"].xcom_push(key="build_name", value=env.build_name) + + if settings.DEBUG: + env.add_skipped_stage(sd.StageUpdatePlanet) + + run_generation( + env, + ( + sd.StageDownloadAndConvertPlanet(), + sd.StageUpdatePlanet(), + sd.StageCleanup(), + ), + ) + env.finish() + + +def publish_planet(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + env = Env(build_name=build_name) + storage.wd_publish(env.paths.planet_o5m, PLANET_STORAGE_PATH) + storage.wd_publish(md5_ext(env.paths.planet_o5m), md5_ext(PLANET_STORAGE_PATH)) + + +UPDATE_PLANET_TASK = PythonOperator( + task_id="Update_planet_task", + provide_context=True, + python_callable=update_planet, + dag=DAG, +) + + +PUBLISH_PLANET_TASK = PythonOperator( + task_id="Publish_planet_task", + provide_context=True, + python_callable=publish_planet, + dag=DAG, +) + + +RM_BUILD_TASK = make_rm_build_task(DAG) + + +UPDATE_PLANET_TASK >> PUBLISH_PLANET_TASK >> RM_BUILD_TASK diff --git a/tools/python/airmaps/instruments/settings.py b/tools/python/airmaps/instruments/settings.py new file mode 100644 index 0000000000..45200b476f --- /dev/null +++ b/tools/python/airmaps/instruments/settings.py @@ -0,0 +1,62 @@ +import sys +from typing import AnyStr + +from maps_generator.generator import settings + +STORAGE_PREFIX = "" + +# Storage settings +WD_HOST = "" +WD_LOGIN = "" +WD_PASSWORD = "" + +# Common section +EMAILS = [] + +settings.LOGGING["loggers"]["airmaps"] = { + "handlers": ["stdout", "file"], + "level": "DEBUG", + "propagate": True, +} + + +def get_airmaps_emails(emails: AnyStr): + if not emails: + return [] + + for ch in [",", ";", ":"]: + emails.replace(ch, " ") + + return list(filter(None, [e.strpip() for e in emails.join(" ")])) + + +def init(default_settings_path: AnyStr): + settings.init(default_settings_path) + + # Try to read a config and to overload default settings + cfg = settings.CfgReader(default_settings_path) + + # Storage section + global WD_HOST + global WD_LOGIN + global WD_PASSWORD + + WD_HOST = cfg.get_opt("Storage", "WD_HOST", WD_HOST) + WD_LOGIN = cfg.get_opt("Storage", "WD_LOGIN", WD_LOGIN) + WD_PASSWORD = cfg.get_opt("Storage", "WD_PASSWORD", WD_PASSWORD) + + # Common section + global EMAILS + emails = cfg.get_opt("Common", "EMAILS", "") + EMAILS = get_airmaps_emails(emails) + + # Import all contains from maps_generator.generator.settings. + thismodule = sys.modules[__name__] + for name in dir(settings): + if not name.startswith("_") and name.isupper(): + value = getattr(settings, name) + setattr(thismodule, name, value) + + global STORAGE_PREFIX + if settings.DEBUG: + STORAGE_PREFIX = "/tests" diff --git a/tools/python/airmaps/instruments/storage.py b/tools/python/airmaps/instruments/storage.py new file mode 100644 index 0000000000..e62b3da68a --- /dev/null +++ b/tools/python/airmaps/instruments/storage.py @@ -0,0 +1,27 @@ +import logging + +import webdav.client as wc + +from airmaps.instruments import settings + +logger = logging.getLogger("airmaps") + +WD_OPTIONS = { + "webdav_hostname": settings.WD_HOST, + "webdav_login": settings.WD_LOGIN, + "webdav_password": settings.WD_PASSWORD, +} + + +def wd_fetch(src, dst): + logger.info(f"Fetch form {src} to {dst} with options {WD_OPTIONS}.") + client = wc.Client(WD_OPTIONS) + client.download_sync(src, dst) + + +def wd_publish(src, dst): + logger.info(f"Publish form {src} to {dst} with options {WD_OPTIONS}.") + client = wc.Client(WD_OPTIONS) + tmp = f"{dst[:-1]}__/" if dst[-1] == "/" else f"{dst}__" + client.upload_sync(local_path=src, remote_path=tmp) + client.move(remote_path_from=tmp, remote_path_to=dst) diff --git a/tools/python/airmaps/instruments/utils.py b/tools/python/airmaps/instruments/utils.py new file mode 100644 index 0000000000..40eb42e8a9 --- /dev/null +++ b/tools/python/airmaps/instruments/utils.py @@ -0,0 +1,48 @@ +import os +import shutil +from datetime import datetime +from typing import Iterable + +from airflow.operators.python_operator import PythonOperator + +from maps_generator.generator.env import Env +from maps_generator.generator.stages import Stage +from maps_generator.generator.stages import get_stage_name +from maps_generator.maps_generator import run_generation + + +def put_current_date_in_filename(filename): + path, name = os.path.split(filename) + parts = name.split(".", maxsplit=1) + parts[0] += f"__{datetime.today().strftime('%Y_%m_%d')}" + return os.path.join(path, ".".join(parts)) + + +def get_latest_filename(filename, prefix=""): + path, name = os.path.split(filename) + parts = name.split(".", maxsplit=1) + assert len(parts) != 0, parts + parts[0] = f"{prefix}latest" + return os.path.join(path, ".".join(parts)) + + +def rm_build(**kwargs): + build_name = kwargs["ti"].xcom_pull(key="build_name") + env = Env(build_name=build_name) + shutil.rmtree(env.build_path) + + +def make_rm_build_task(dag): + return PythonOperator( + task_id="Rm_build_task", + provide_context=True, + python_callable=rm_build, + dag=dag, + ) + + +def run_generation_from_first_stage( + env: Env, stages: Iterable[Stage], build_lock: bool = True +): + from_stage = get_stage_name(next(iter(stages))) + run_generation(env, stages, from_stage, build_lock) diff --git a/tools/python/airmaps/requirements.txt b/tools/python/airmaps/requirements.txt new file mode 100644 index 0000000000..7034a8db88 --- /dev/null +++ b/tools/python/airmaps/requirements.txt @@ -0,0 +1,5 @@ +apache-airflow [postgres]==1.10.10 +psycopg2-binary==2.8.4 +cryptography==2.8 +webdavclient==1.0.8 +-r ../maps_generator/requirements.txt \ No newline at end of file diff --git a/tools/python/airmaps/sandbox/.dockerignore b/tools/python/airmaps/sandbox/.dockerignore new file mode 100644 index 0000000000..24f5e054ab --- /dev/null +++ b/tools/python/airmaps/sandbox/.dockerignore @@ -0,0 +1,4 @@ +./.git* +./android +./iphone +./xcode diff --git a/tools/python/airmaps/sandbox/README.md b/tools/python/airmaps/sandbox/README.md new file mode 100644 index 0000000000..0b00e341d6 --- /dev/null +++ b/tools/python/airmaps/sandbox/README.md @@ -0,0 +1,76 @@ +# Sandbox + +This project can show how airmaps works on your computer. + +## Setup + +You must have [docker](https://docs.docker.com/get-docker/) and [docker-compose](https://docs.docker.com/compose/install/). + +0. Change working directory: +```sh +$ cd omim/tools/python/airmaps/sandbox +``` + +1. Build airmaps service: +```sh +sandbox$ ./build.sh +``` + +2. Create storage(sandbox/storage directory): +```sh +sandbox$ ./create_storage.sh +``` +Note: May be you need ```sudo```, because ```./create_storage.sh``` to try change an owner of ```sandbox/storage/tests``` directory. + + +## Usage +### Starting +0. Change working directory: +```sh +$ cd omim/tools/python/airmaps/sandbox +``` + +1. Run all services: +```sh +sandbox$ docker-compose up +``` + +2. Open http://localhost in your browser. + +Note: You can see the results of airmaps working in ```sandbox/storage/tests```. + +### Stopping +0. Change working directory: +```sh +$ cd omim/tools/python/airmaps/sandbox +``` + +1. Stop all services: +Push Ctrl+C and +```sh +sandbox$ docker-compose down +``` + +### Clean +#### Clean storage and intermediate files: + +0. Change working directory: +```sh +$ cd omim/tools/python/airmaps/sandbox +``` + +1. Clean storage and intermediate files: +```sh +sandbox$ ./clean.sh +``` + +#### Remove images: +0. Change working directory: +```sh +$ cd omim/tools/python/airmaps/sandbox +``` + +1. Remove images: +```sh +sandbox$ docker-compose rm +``` diff --git a/tools/python/airmaps/sandbox/airmaps/Dockerfile b/tools/python/airmaps/sandbox/airmaps/Dockerfile new file mode 100644 index 0000000000..af3eddb1d8 --- /dev/null +++ b/tools/python/airmaps/sandbox/airmaps/Dockerfile @@ -0,0 +1,29 @@ +FROM python:3.6 + +ARG TZ=Etc/UTC + +WORKDIR /omim/ + +ADD . . + +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + libgl1-mesa-dev \ + libsqlite3-dev \ + qt5-default \ + zlib1g-dev \ + tzdata \ + locales-all + +RUN ln -fs /usr/share/zoneinfo/$TZ /etc/localtime && \ + dpkg-reconfigure --frontend noninteractive tzdata + +RUN echo "" | ./configure.sh \ + && ./tools/unix/build_omim.sh -rs generator_tool + +RUN pip install --upgrade pip + +RUN pip install werkzeug==0.16.0 \ + SQLAlchemy==1.3.15 \ + -r ./tools/python/airmaps/requirements.txt diff --git a/tools/python/airmaps/sandbox/airmaps/airflow.cfg b/tools/python/airmaps/sandbox/airmaps/airflow.cfg new file mode 100644 index 0000000000..686ae5b2c7 --- /dev/null +++ b/tools/python/airmaps/sandbox/airmaps/airflow.cfg @@ -0,0 +1,856 @@ +[core] +# The folder where your airflow pipelines live, most likely a +# subfolder in a code repository +# This path must be absolute +dags_folder = /omim/tools/python/airmaps/dags + +# The folder where airflow should store its log files +# This path must be absolute +base_log_folder = /airflow_home/logs + +# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. +# Users must supply an Airflow connection id that provides access to the storage +# location. If remote_logging is set to true, see UPDATING.md for additional +# configuration requirements. +remote_logging = False +remote_log_conn_id = +remote_base_log_folder = +encrypt_s3_logs = False + +# Logging level +logging_level = INFO +fab_logging_level = WARN + +# Logging class +# Specify the class that will specify the logging configuration +# This class has to be on the python classpath +# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG +logging_config_class = + +# Log format +# Colour the logs when the controlling terminal is a TTY. +colored_console_log = True +colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s +colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter + +log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s +simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s + +# Log filename format +log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log +log_processor_filename_template = {{ filename }}.log +dag_processor_manager_log_location = /airflow_home/logs/dag_processor_manager/dag_processor_manager.log + +# Hostname by providing a path to a callable, which will resolve the hostname +# The format is "package:function". For example, +# default value "socket:getfqdn" means that result from getfqdn() of "socket" package will be used as hostname +# No argument should be required in the function specified. +# If using IP address as hostname is preferred, use value "airflow.utils.net:get_host_ip_address" +hostname_callable = socket:getfqdn + +# Default timezone in case supplied date times are naive +# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) +default_timezone = system + +# The executor class that airflow should use. Choices include +# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor +executor = LocalExecutor + +# The SqlAlchemy connection string to the metadata database. +# SqlAlchemy supports many different database engine, more information +# their website +sql_alchemy_conn = postgresql+psycopg2://postgres:postgres@db:5432/airflow + +# The encoding for the databases +sql_engine_encoding = utf-8 + +# If SqlAlchemy should pool database connections. +sql_alchemy_pool_enabled = True + +# The SqlAlchemy pool size is the maximum number of database connections +# in the pool. 0 indicates no limit. +sql_alchemy_pool_size = 5 + +# The maximum overflow size of the pool. +# When the number of checked-out connections reaches the size set in pool_size, +# additional connections will be returned up to this limit. +# When those additional connections are returned to the pool, they are disconnected and discarded. +# It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow, +# and the total number of "sleeping" connections the pool will allow is pool_size. +# max_overflow can be set to -1 to indicate no overflow limit; +# no limit will be placed on the total number of concurrent connections. Defaults to 10. +sql_alchemy_max_overflow = 10 + +# The SqlAlchemy pool recycle is the number of seconds a connection +# can be idle in the pool before it is invalidated. This config does +# not apply to sqlite. If the number of DB connections is ever exceeded, +# a lower config value will allow the system to recover faster. +sql_alchemy_pool_recycle = 1800 + +# Check connection at the start of each connection pool checkout. +# Typically, this is a simple statement like “SELECT 1”. +# More information here: https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic +sql_alchemy_pool_pre_ping = True + +# The schema to use for the metadata database +# SqlAlchemy supports databases with the concept of multiple schemas. +sql_alchemy_schema = + +# The amount of parallelism as a setting to the executor. This defines +# the max number of task instances that should run simultaneously +# on this airflow installation +parallelism = 32 + +# The number of task instances allowed to run concurrently by the scheduler +dag_concurrency = 16 + +# Are DAGs paused by default at creation +dags_are_paused_at_creation = True + +# The maximum number of active DAG runs per DAG +max_active_runs_per_dag = 16 + +# Whether to load the examples that ship with Airflow. It's good to +# get started, but you probably want to set this to False in a production +# environment +load_examples = False + +# Where your Airflow plugins are stored +plugins_folder = /airflow_home/plugins + +# Secret key to save connection passwords in the db +fernet_key = uoTKzPCjhVBsERkDylXY5g1hYeg7OAYjk_a_ek2YMwQ= + +# Whether to disable pickling dags +donot_pickle = False + +# How long before timing out a python file import +dagbag_import_timeout = 30 + +# How long before timing out a DagFileProcessor, which processes a dag file +dag_file_processor_timeout = 50 + +# The class to use for running task instances in a subprocess +task_runner = StandardTaskRunner + +# If set, tasks without a `run_as_user` argument will be run with this user +# Can be used to de-elevate a sudo user running Airflow when executing tasks +default_impersonation = + +# What security module to use (for example kerberos): +security = + +# If set to False enables some unsecure features like Charts and Ad Hoc Queries. +# In 2.0 will default to True. +secure_mode = False + +# Turn unit test mode on (overwrites many configuration options with test +# values at runtime) +unit_test_mode = False + +# Name of handler to read task instance logs. +# Default to use task handler. +task_log_reader = task + +# Whether to enable pickling for xcom (note that this is insecure and allows for +# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False). +enable_xcom_pickling = True + +# When a task is killed forcefully, this is the amount of time in seconds that +# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED +killed_task_cleanup_time = 60 + +# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or +# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params. +dag_run_conf_overrides_params = False + +# Worker initialisation check to validate Metadata Database connection +worker_precheck = False + +# When discovering DAGs, ignore any files that don't contain the strings `DAG` and `airflow`. +dag_discovery_safe_mode = True + +# The number of retries each task is going to have by default. Can be overridden at dag or task level. +default_task_retries = 0 + + +[cli] +# In what way should the cli access the API. The LocalClient will use the +# database directly, while the json_client will use the api running on the +# webserver +api_client = airflow.api.client.local_client + +# If you set web_server_url_prefix, do NOT forget to append it here, ex: +# endpoint_url = http://localhost:8080/myroot +# So api will look like: http://localhost:8080/myroot/api/experimental/... +endpoint_url = http://localhost:8080 + +[api] +# How to authenticate users of the API +auth_backend = airflow.api.auth.backend.default + +[lineage] +# what lineage backend to use +backend = + +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = + +[operators] +# The default owner assigned to each new operator, unless +# provided explicitly or passed via `default_args` +default_owner = airflow +default_cpus = 1 +default_ram = 512 +default_disk = 512 +default_gpus = 0 + +[hive] +# Default mapreduce queue for HiveOperator tasks +default_hive_mapred_queue = + +[webserver] +# The base url of your website as airflow cannot guess what domain or +# cname you are using. This is used in automated emails that +# airflow sends to point links to the right web server +base_url = http://localhost:8080 + +# Default timezone to display all dates in the UI, can be UTC, system, or +# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the +# default value of core/default_timezone will be used +# Example: default_ui_timezone = America/New_York +default_ui_timezone = system + +# The ip specified when starting the web server +web_server_host = 0.0.0.0 + +# The port on which to run the web server +web_server_port = 8080 + +# Paths to the SSL certificate and key for the web server. When both are +# provided SSL will be enabled. This does not change the web server port. +web_server_ssl_cert = +web_server_ssl_key = + +# Number of seconds the webserver waits before killing gunicorn master that doesn't respond +web_server_master_timeout = 120 + +# Number of seconds the gunicorn webserver waits before timing out on a worker +web_server_worker_timeout = 120 + +# Number of workers to refresh at a time. When set to 0, worker refresh is +# disabled. When nonzero, airflow periodically refreshes webserver workers by +# bringing up new ones and killing old ones. +worker_refresh_batch_size = 1 + +# Number of seconds to wait before refreshing a batch of workers. +worker_refresh_interval = 30 + +# Secret key used to run your flask app +secret_key = temporary_key + +# Number of workers to run the Gunicorn web server +workers = 4 + +# The worker class gunicorn should use. Choices include +# sync (default), eventlet, gevent +worker_class = sync + +# Log files for the gunicorn webserver. '-' means log to stderr. +access_logfile = - +error_logfile = - + +# Expose the configuration file in the web server +# This is only applicable for the flask-admin based web UI (non FAB-based). +# In the FAB-based web UI with RBAC feature, +# access to configuration is controlled by role permissions. +expose_config = False + +# Set to true to turn on authentication: +# https://airflow.apache.org/security.html#web-authentication +authenticate = False + +# Filter the list of dags by owner name (requires authentication to be enabled) +filter_by_owner = False + +# Filtering mode. Choices include user (default) and ldapgroup. +# Ldap group filtering requires using the ldap backend +# +# Note that the ldap server needs the "memberOf" overlay to be set up +# in order to user the ldapgroup mode. +owner_mode = user + +# Default DAG view. Valid values are: +# tree, graph, duration, gantt, landing_times +dag_default_view = tree + +# Default DAG orientation. Valid values are: +# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top) +dag_orientation = LR + +# Puts the webserver in demonstration mode; blurs the names of Operators for +# privacy. +demo_mode = False + +# The amount of time (in secs) webserver will wait for initial handshake +# while fetching logs from other worker machine +log_fetch_timeout_sec = 5 + +# By default, the webserver shows paused DAGs. Flip this to hide paused +# DAGs by default +hide_paused_dags_by_default = False + +# Consistent page size across all listing views in the UI +page_size = 100 + +# Use FAB-based webserver with RBAC feature +rbac = False + +# Define the color of navigation bar +navbar_color = #007A87 + +# Default dagrun to show in UI +default_dag_run_display_number = 25 + +# Enable werkzeug `ProxyFix` middleware +enable_proxy_fix = False + +# Set secure flag on session cookie +cookie_secure = False + +# Set samesite policy on session cookie +cookie_samesite = + +# Default setting for wrap toggle on DAG code and TI log views. +default_wrap = False + +# Send anonymous user activity to your analytics tool +# analytics_tool = # choose from google_analytics, segment, or metarouter +# analytics_id = XXXXXXXXXXX + +[email] +email_backend = airflow.utils.email.send_email_smtp + + +[smtp] +# If you want airflow to send emails on retries, failure, and you want to use +# the airflow.utils.email.send_email_smtp function, you have to configure an +# smtp server here +smtp_host = localhost +smtp_starttls = True +smtp_ssl = False +# Uncomment and set the user/pass settings if you want to use SMTP AUTH +# smtp_user = airflow +# smtp_password = airflow +smtp_port = 25 +smtp_mail_from = airflow@example.com + +[sentry] +# Sentry (https://docs.sentry.io) integration +sentry_dsn = + + +[celery] +# This section only applies if you are using the CeleryExecutor in +# [core] section above + +# The app name that will be used by celery +celery_app_name = airflow.executors.celery_executor + +# The concurrency that will be used when starting workers with the +# "airflow worker" command. This defines the number of task instances that +# a worker will take, so size up your workers based on the resources on +# your worker box and the nature of your tasks +worker_concurrency = 40 + +# The maximum and minimum concurrency that will be used when starting workers with the +# "airflow worker" command (always keep minimum processes, but grow to maximum if necessary). +# Note the value should be "max_concurrency,min_concurrency" +# Pick these numbers based on resources on worker box and the nature of the task. +# If autoscale option is available, worker_concurrency will be ignored. +# http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale +# worker_autoscale = 16,12 + +# When you start an airflow worker, airflow starts a tiny web server +# subprocess to serve the workers local log files to the airflow main +# web server, who then builds pages and sends them to users. This defines +# the port on which the logs are served. It needs to be unused, and open +# visible from the main web server to connect into the workers. +worker_log_server_port = 8793 + +# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally +# a sqlalchemy database. Refer to the Celery documentation for more +# information. +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings +broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow + +# The Celery result_backend. When a job finishes, it needs to update the +# metadata of the job. Therefore it will post a message on a message bus, +# or insert it into a database (depending of the backend) +# This status is used by the scheduler to update the state of the task +# The use of a database is highly recommended +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings +result_backend = db+mysql://airflow:airflow@localhost:3306/airflow + +# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start +# it `airflow flower`. This defines the IP that Celery Flower runs on +flower_host = 0.0.0.0 + +# The root URL for Flower +# Ex: flower_url_prefix = /flower +flower_url_prefix = + +# This defines the port that Celery Flower runs on +flower_port = 5555 + +# Securing Flower with Basic Authentication +# Accepts user:password pairs separated by a comma +# Example: flower_basic_auth = user1:password1,user2:password2 +flower_basic_auth = + +# Default queue that tasks get assigned to and that worker listen on. +default_queue = default + +# How many processes CeleryExecutor uses to sync task state. +# 0 means to use max(1, number of cores - 1) processes. +sync_parallelism = 0 + +# Import path for celery configuration options +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG + +# In case of using SSL +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = + +# Celery Pool implementation. +# Choices include: prefork (default), eventlet, gevent or solo. +# See: +# https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency +# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html +pool = prefork + +[celery_broker_transport_options] +# This section is for specifying options which can be passed to the +# underlying celery broker transport. See: +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options + +# The visibility timeout defines the number of seconds to wait for the worker +# to acknowledge the task before the message is redelivered to another worker. +# Make sure to increase the visibility timeout to match the time of the longest +# ETA you're planning to use. +# +# visibility_timeout is only supported for Redis and SQS celery brokers. +# See: +# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# +#visibility_timeout = 21600 + +[dask] +# This section only applies if you are using the DaskExecutor in +# [core] section above + +# The IP address and port of the Dask cluster's scheduler. +cluster_address = 127.0.0.1:8786 +# TLS/ SSL settings to access a secured Dask scheduler. +tls_ca = +tls_cert = +tls_key = + + +[scheduler] +# Task instances listen for external kill signal (when you clear tasks +# from the CLI or the UI), this defines the frequency at which they should +# listen (in seconds). +job_heartbeat_sec = 5 + +# The scheduler constantly tries to trigger new tasks (look at the +# scheduler section in the docs for more information). This defines +# how often the scheduler should run (in seconds). +scheduler_heartbeat_sec = 5 + +# after how much time should the scheduler terminate in seconds +# -1 indicates to run continuously (see also num_runs) +run_duration = -1 + +# The number of times to try to schedule each DAG file +# -1 indicates unlimited number +num_runs = -1 + +# The number of seconds to wait between consecutive DAG file processing +processor_poll_interval = 1 + +# after how much time (seconds) a new DAGs should be picked up from the filesystem +min_file_process_interval = 0 + +# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. +dag_dir_list_interval = 300 + +# How often should stats be printed to the logs +print_stats_interval = 30 + +# If the last scheduler heartbeat happened more than scheduler_health_check_threshold ago (in seconds), +# scheduler is considered unhealthy. +# This is used by the health check in the "/health" endpoint +scheduler_health_check_threshold = 30 + +child_process_log_directory = /airflow_home/logs/scheduler + +# Local task jobs periodically heartbeat to the DB. If the job has +# not heartbeat in this many seconds, the scheduler will mark the +# associated task instance as failed and will re-schedule the task. +scheduler_zombie_task_threshold = 300 + +# Turn off scheduler catchup by setting this to False. +# Default behavior is unchanged and +# Command Line Backfills still work, but the scheduler +# will not do scheduler catchup if this is False, +# however it can be set on a per DAG basis in the +# DAG definition (catchup) +catchup_by_default = True + +# This changes the batch size of queries in the scheduling main loop. +# If this is too high, SQL query performance may be impacted by one +# or more of the following: +# - reversion to full table scan +# - complexity of query predicate +# - excessive locking +# +# Additionally, you may hit the maximum allowable query length for your db. +# +# Set this to 0 for no limit (not advised) +max_tis_per_query = 512 + +# Statsd (https://github.com/etsy/statsd) integration settings +statsd_on = False +statsd_host = localhost +statsd_port = 8125 +statsd_prefix = airflow + +# If you want to avoid send all the available metrics to StatsD, +# you can configure an allow list of prefixes to send only the metrics that +# start with the elements of the list (e.g: scheduler,executor,dagrun) +statsd_allow_list = + +# The scheduler can run multiple threads in parallel to schedule dags. +# This defines how many threads will run. +max_threads = 8 + +authenticate = False + +# Turn off scheduler use of cron intervals by setting this to False. +# DAGs submitted manually in the web UI or with trigger_dag will still run. +use_job_schedule = True + +[ldap] +# set this to ldaps://: +uri = +user_filter = objectClass=* +user_name_attr = uid +group_member_attr = memberOf +superuser_filter = +data_profiler_filter = +bind_user = cn=Manager,dc=example,dc=com +bind_password = insecure +basedn = dc=example,dc=com +cacert = /etc/ca/ldap_ca.crt +search_scope = LEVEL + +# This setting allows the use of LDAP servers that either return a +# broken schema, or do not return a schema. +ignore_malformed_schema = False + +[mesos] +# Mesos master address which MesosExecutor will connect to. +master = localhost:5050 + +# The framework name which Airflow scheduler will register itself as on mesos +framework_name = Airflow + +# Number of cpu cores required for running one task instance using +# 'airflow run --local -p ' +# command on a mesos slave +task_cpu = 1 + +# Memory in MB required for running one task instance using +# 'airflow run --local -p ' +# command on a mesos slave +task_memory = 256 + +# Enable framework checkpointing for mesos +# See http://mesos.apache.org/documentation/latest/slave-recovery/ +checkpoint = False + +# Failover timeout in milliseconds. +# When checkpointing is enabled and this option is set, Mesos waits +# until the configured timeout for +# the MesosExecutor framework to re-register after a failover. Mesos +# shuts down running tasks if the +# MesosExecutor framework fails to re-register within this timeframe. +# failover_timeout = 604800 + +# Enable framework authentication for mesos +# See http://mesos.apache.org/documentation/latest/configuration/ +authenticate = False + +# Mesos credentials, if authentication is enabled +# default_principal = admin +# default_secret = admin + +# Optional Docker Image to run on slave before running the command +# This image should be accessible from mesos slave i.e mesos slave +# should be able to pull this docker image before executing the command. +# docker_image_slave = puckel/docker-airflow + +[kerberos] +ccache = /tmp/airflow_krb5_ccache +# gets augmented with fqdn +principal = airflow +reinit_frequency = 3600 +kinit_path = kinit +keytab = airflow.keytab + + +[github_enterprise] +api_rev = v3 + +[admin] +# UI to hide sensitive variable fields when set to True +hide_sensitive_variable_fields = True + +[elasticsearch] +# Elasticsearch host +host = +# Format of the log_id, which is used to query for a given tasks logs +log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} +# Used to mark the end of a log stream for a task +end_of_log_mark = end_of_log +# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id +# Code will construct log_id using the log_id template from the argument above. +# NOTE: The code will prefix the https:// automatically, don't include that here. +frontend = +# Write the task logs to the stdout of the worker, rather than the default files +write_stdout = False +# Instead of the default log formatter, write the log lines as JSON +json_format = False +# Log fields to also attach to the json output, if enabled +json_fields = asctime, filename, lineno, levelname, message + +[elasticsearch_configs] + +use_ssl = False +verify_certs = True + +[kubernetes] +# The repository, tag and imagePullPolicy of the Kubernetes Image for the Worker to Run +worker_container_repository = +worker_container_tag = +worker_container_image_pull_policy = IfNotPresent + +# If True (default), worker pods will be deleted upon termination +delete_worker_pods = True + +# Number of Kubernetes Worker Pod creation calls per scheduler loop +worker_pods_creation_batch_size = 1 + +# The Kubernetes namespace where airflow workers should be created. Defaults to `default` +namespace = default + +# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) +airflow_configmap = + +# For docker image already contains DAGs, this is set to `True`, and the worker will search for dags in dags_folder, +# otherwise use git sync or dags volume claim to mount DAGs +dags_in_image = False + +# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs +dags_volume_subpath = + +# For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) +dags_volume_claim = + +# For volume mounted logs, the worker will look in this subpath for logs +logs_volume_subpath = + +# A shared volume claim for the logs +logs_volume_claim = + +# For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync) +# Useful in local environment, discouraged in production +dags_volume_host = + +# A hostPath volume for the logs +# Useful in local environment, discouraged in production +logs_volume_host = + +# A list of configMapsRefs to envFrom. If more than one configMap is +# specified, provide a comma separated list: configmap_a,configmap_b +env_from_configmap_ref = + +# A list of secretRefs to envFrom. If more than one secret is +# specified, provide a comma separated list: secret_a,secret_b +env_from_secret_ref = + +# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim) +git_repo = +git_branch = +git_subpath = +# Use git_user and git_password for user authentication or git_ssh_key_secret_name and git_ssh_key_secret_key +# for SSH authentication +git_user = +git_password = +git_sync_root = /git +git_sync_dest = repo +git_dags_folder_mount_point = + +# To get Git-sync SSH authentication set up follow this format +# +# airflow-secrets.yaml: +# --- +# apiVersion: v1 +# kind: Secret +# metadata: +# name: airflow-secrets +# data: +# # key needs to be gitSshKey +# gitSshKey: +# --- +# airflow-configmap.yaml: +# apiVersion: v1 +# kind: ConfigMap +# metadata: +# name: airflow-configmap +# data: +# known_hosts: | +# github.com ssh-rsa <...> +# airflow.cfg: | +# ... +# +# git_ssh_key_secret_name = airflow-secrets +# git_ssh_known_hosts_configmap_name = airflow-configmap +git_ssh_key_secret_name = +git_ssh_known_hosts_configmap_name = + +# To give the git_sync init container credentials via a secret, create a secret +# with two fields: GIT_SYNC_USERNAME and GIT_SYNC_PASSWORD (example below) and +# add `git_sync_credentials_secret = ` to your airflow config under the kubernetes section +# +# Secret Example: +# apiVersion: v1 +# kind: Secret +# metadata: +# name: git-credentials +# data: +# GIT_SYNC_USERNAME: +# GIT_SYNC_PASSWORD: +git_sync_credentials_secret = + +# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync +git_sync_container_repository = k8s.gcr.io/git-sync +git_sync_container_tag = v3.1.1 +git_sync_init_container_name = git-sync-clone +git_sync_run_as_user = 65533 + +# The name of the Kubernetes service account to be associated with airflow workers, if any. +# Service accounts are required for workers that require access to secrets or cluster resources. +# See the Kubernetes RBAC documentation for more: +# https://kubernetes.io/docs/admin/authorization/rbac/ +worker_service_account_name = + +# Any image pull secrets to be given to worker pods, If more than one secret is +# required, provide a comma separated list: secret_a,secret_b +image_pull_secrets = + +# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors +# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2 +gcp_service_account_keys = + +# Use the service account kubernetes gives to pods to connect to kubernetes cluster. +# It's intended for clients that expect to be running inside a pod running on kubernetes. +# It will raise an exception if called from a process not running in a kubernetes environment. +in_cluster = True + +# When running with in_cluster=False change the default cluster_context or config_file +# options to Kubernetes client. Leave blank these to use default behaviour like `kubectl` has. +# cluster_context = +# config_file = + + +# Affinity configuration as a single line formatted JSON object. +# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.): +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core +affinity = + +# A list of toleration objects as a single line formatted JSON array +# See: +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core +tolerations = + +# **kwargs parameters to pass while calling a kubernetes client core_v1_api methods from Kubernetes Executor +# provided as a single line formatted JSON dictionary string. +# List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis +# See: +# https://raw.githubusercontent.com/kubernetes-client/python/master/kubernetes/client/apis/core_v1_api.py +# Note that if no _request_timeout is specified, the kubernetes client will wait indefinitely for kubernetes +# api responses, which will cause the scheduler to hang. The timeout is specified as [connect timeout, read timeout] +kube_client_request_args = {"_request_timeout" : [60,60] } + +# Worker pods security context options +# See: +# https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ + +# Specifies the uid to run the first process of the worker pods containers as +run_as_user = + +# Specifies a gid to associate with all containers in the worker pods +# if using a git_ssh_key_secret_name use an fs_group +# that allows for the key to be read, e.g. 65533 +fs_group = + +[kubernetes_node_selectors] +# The Key-value pairs to be given to worker pods. +# The worker pods will be scheduled to the nodes of the specified key-value pairs. +# Should be supplied in the format: key = value + +[kubernetes_annotations] +# The Key-value annotations pairs to be given to worker pods. +# Should be supplied in the format: key = value + +[kubernetes_environment_variables] +# The scheduler sets the following environment variables into your workers. You may define as +# many environment variables as needed and the kubernetes launcher will set them in the launched workers. +# Environment variables in this section are defined as follows +# = +# +# For example if you wanted to set an environment variable with value `prod` and key +# `ENVIRONMENT` you would follow the following format: +# ENVIRONMENT = prod +# +# Additionally you may override worker airflow settings with the AIRFLOW__
__ +# formatting as supported by airflow normally. + +[kubernetes_secrets] +# The scheduler mounts the following secrets into your workers as they are launched by the +# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the +# defined secrets and mount them as secret environment variables in the launched workers. +# Secrets in this section are defined as follows +# = = +# +# For example if you wanted to mount a kubernetes secret key named `postgres_password` from the +# kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into +# your workers you would follow the following format: +# POSTGRES_PASSWORD = airflow-secret=postgres_credentials +# +# Additionally you may override worker airflow settings with the AIRFLOW__
__ +# formatting as supported by airflow normally. + +[kubernetes_labels] +# The Key-value pairs to be given to worker pods. +# The worker pods will be given these static labels, as well as some additional dynamic labels +# to identify the task. +# Should be supplied in the format: key = value diff --git a/tools/python/airmaps/sandbox/airmaps/airmaps.ini b/tools/python/airmaps/sandbox/airmaps/airmaps.ini new file mode 100644 index 0000000000..a840687143 --- /dev/null +++ b/tools/python/airmaps/sandbox/airmaps/airmaps.ini @@ -0,0 +1,82 @@ +[Main] +# The path where the planet will be downloaded and the maps are generated. +MAIN_OUT_PATH: /maps_build +# If the flag DEBUG is set a special small planet file will be downloaded. +DEBUG: 1 + + +[Developer] +# The path where the generator_tool will be searched. +BUILD_PATH: /omim-build-release +# The path to the project directory omim. +OMIM_PATH: /omim + + +[Storage] +# Webdaw settings. +WD_HOST: webdav +WD_LOGIN: alice +WD_PASSWORD: secret1234 + + +[Generator tool] +# The path to the omim/data. +USER_RESOURCE_PATH: ${Developer:OMIM_PATH}/data +# Do not change it. This is determined automatically. +# NODE_STORAGE: map + + +[Osm tools] +# The path to the osmctools sources. +OSM_TOOLS_SRC_PATH: ${Developer:OMIM_PATH}/tools/osmctools +# The path where osmctools will be searched or will be built. +OSM_TOOLS_PATH: /osmctools + + +[Stages] +# Run osmupdate tool for planet. +NEED_PLANET_UPDATE: 0 + + +[Logging] +# The path where maps_generator log will be saved. +# LOG_FILE_PATH: generation.log + + +[External] +# The url to the planet file. +# PLANET_URL: +# The url to the file with md5 sum of the planet. +# PLANET_MD5_URL: +# The base url to WorldCoasts.geom and WorldCoasts.rawgeom (without file name). +# Files latest_coasts.geom and latest_coasts.rawgeom must be at this URL. +# For example, if PLANET_COASTS_URL = https://somesite.com/download/ +# The https://somesite.com/download/latest_coasts.geom url will be used to download latest_coasts.geom and +# the https://somesite.com/download/latest_coasts.rawgeom url will be used to download latest_coasts.rawgeom. +# PLANET_COASTS_URL: +# The url to the subway file. +SUBWAY_URL: http://osm-subway.maps.me/mapsme/latest.json + +# Urls for production maps generation. +# UGC_URL: +# HOTELS_URL: +# PROMO_CATALOG_CITIES: +# POPULARITY_URL: +# FOOD_URL: +# FOOD_TRANSLATIONS_URL: +# SRTM_PATH: +# ISOLINES_PATH: +# UK_POSTCODES_URL: +# US_POSTCODES_URL: + + +[Common] +# Auto detection. +THREADS_COUNT: 0 +# Emails for mailing. +# EMAILS: + + +[Stats] +# Path to rules for calculating statistics by type +STATS_TYPES_CONFIG: ${Developer:OMIM_PATH}/tools/python/maps_generator/var/etc/stats_types_config.txt diff --git a/tools/python/airmaps/sandbox/airmaps/run_airmaps_service.sh b/tools/python/airmaps/sandbox/airmaps/run_airmaps_service.sh new file mode 100755 index 0000000000..697c23681d --- /dev/null +++ b/tools/python/airmaps/sandbox/airmaps/run_airmaps_service.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +export PYTHONPATH=/omim/tools/python +export AIRFLOW_HOME=/airflow_home + +# Initialize the database. +airflow initdb + +# Start the web server, default port is 8880. +airflow webserver -p 8880 & + +# Start the scheduler. +airflow scheduler diff --git a/tools/python/airmaps/sandbox/build.sh b/tools/python/airmaps/sandbox/build.sh new file mode 100755 index 0000000000..48df266db6 --- /dev/null +++ b/tools/python/airmaps/sandbox/build.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +BUILD_PATH="$(dirname "$0")" +OMIM_PATH="$(cd "${OMIM_PATH:-${BUILD_PATH}/../../../..}"; pwd)" + +echo "Build airmaps service.." + +mv "${OMIM_PATH}/.dockerignore" "${OMIM_PATH}/.dockerignore_" 2> /dev/null +cp "${BUILD_PATH}/.dockerignore" ${OMIM_PATH} + +docker-compose build + +rm "${OMIM_PATH}/.dockerignore" +mv "${OMIM_PATH}/.dockerignore_" "${OMIM_PATH}/.dockerignore" 2> /dev/null diff --git a/tools/python/airmaps/sandbox/clean.sh b/tools/python/airmaps/sandbox/clean.sh new file mode 100755 index 0000000000..3c7e8b0311 --- /dev/null +++ b/tools/python/airmaps/sandbox/clean.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +BUILD_PATH="$(dirname "$0")" +OMIM_PATH="$(cd "${OMIM_PATH:-${BUILD_PATH}/../../../..}"; pwd)" + +echo "Cleaning.." +rm "${OMIM_PATH}/.dockerignore" 2> /dev/null +mv "${OMIM_PATH}/.dockerignore_" "${OMIM_PATH}/.dockerignore" 2> /dev/null + +rm -r "${BUILD_PATH}/storage" 2> /dev/null diff --git a/tools/python/airmaps/sandbox/create_storage.sh b/tools/python/airmaps/sandbox/create_storage.sh new file mode 100755 index 0000000000..9baa355bf6 --- /dev/null +++ b/tools/python/airmaps/sandbox/create_storage.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +BUILD_PATH="$(dirname "$0")" + +echo "Creating storage.." +mkdir -p "${BUILD_PATH}/storage/tests/coasts" +mkdir -p "${BUILD_PATH}/storage/tests/maps/open_source" +mkdir -p "${BUILD_PATH}/storage/tests/planet_regular" + +chown -R www-data:www-data "${BUILD_PATH}/storage/tests" diff --git a/tools/python/airmaps/sandbox/docker-compose.yml b/tools/python/airmaps/sandbox/docker-compose.yml new file mode 100644 index 0000000000..125d24addd --- /dev/null +++ b/tools/python/airmaps/sandbox/docker-compose.yml @@ -0,0 +1,41 @@ +version: "3" +services: + webdav: + image: sashgorokhov/webdav + container_name: webdav + hostname: webdav + restart: always + environment: + USERNAME: alice + PASSWORD: secret1234 + volumes: + - ./storage/tests:/media/tests + + db: + image: postgres:12.2 + container_name: db + hostname: db + restart: always + environment: + POSTGRES_DB: airflow + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + + airmaps: + build: + context: ../../../.. + dockerfile: ./tools/python/airmaps/sandbox/airmaps/Dockerfile + args: + - TZ=Europe/Moscow + container_name: airmaps + hostname: airmaps + restart: always + links: + - webdav + - db + ports: + - "80:8880" + command: ./tools/python/airmaps/sandbox/airmaps/run_airmaps_service.sh + volumes: + - ./airmaps/airflow.cfg:/airflow_home/airflow.cfg + - ./airmaps/airmaps.ini:/omim/tools/python/airmaps/var/etc/airmaps.ini diff --git a/tools/python/airmaps/var/etc/airmaps.ini.default b/tools/python/airmaps/var/etc/airmaps.ini.default new file mode 100644 index 0000000000..d4a5921ec2 --- /dev/null +++ b/tools/python/airmaps/var/etc/airmaps.ini.default @@ -0,0 +1,82 @@ +[Main] +# The path where the planet will be downloaded and the maps are generated. +MAIN_OUT_PATH: ~/maps_build +# If the flag DEBUG is set a special small planet file will be downloaded. +DEBUG: 1 + + +[Developer] +# The path where the generator_tool will be searched. +BUILD_PATH: ~/omim-build-release +# The path to the project directory omim. +OMIM_PATH: ~/omim + + +[Storage] +# Webdaw settings. +# WD_HOST: +# WD_LOGIN: +# WD_PASSWORD: + + +[Generator tool] +# The path to the omim/data. +USER_RESOURCE_PATH: ${Developer:OMIM_PATH}/data +# Do not change it. This is determined automatically. +# NODE_STORAGE: map + + +[Osm tools] +# The path to the osmctools sources. +OSM_TOOLS_SRC_PATH: ${Developer:OMIM_PATH}/tools/osmctools +# The path where osmctools will be searched or will be built. +OSM_TOOLS_PATH: ~/osmctools + + +[Stages] +# Run osmupdate tool for planet. +NEED_PLANET_UPDATE: 0 + + +[Logging] +# The path where maps_generator log will be saved. +# LOG_FILE_PATH: generation.log + + +[External] +# The url to the planet file. +# PLANET_URL: +# The url to the file with md5 sum of the planet. +# PLANET_MD5_URL: +# The base url to WorldCoasts.geom and WorldCoasts.rawgeom (without file name). +# Files latest_coasts.geom and latest_coasts.rawgeom must be at this URL. +# For example, if PLANET_COASTS_URL = https://somesite.com/download/ +# The https://somesite.com/download/latest_coasts.geom url will be used to download latest_coasts.geom and +# the https://somesite.com/download/latest_coasts.rawgeom url will be used to download latest_coasts.rawgeom. +# PLANET_COASTS_URL: +# The url to the subway file. +SUBWAY_URL: http://osm-subway.maps.me/mapsme/latest.json + +# Urls for production maps generation. +# UGC_URL: +# HOTELS_URL: +# PROMO_CATALOG_CITIES: +# POPULARITY_URL: +# FOOD_URL: +# FOOD_TRANSLATIONS_URL: +# SRTM_PATH: +# ISOLINES_PATH: +# UK_POSTCODES_URL: +# US_POSTCODES_URL: + + +[Common] +# Auto detection. +THREADS_COUNT: 0 +# Emails for mailing. +# EMAILS: + + +[Stats] +# Path to rules for calculating statistics by type +STATS_TYPES_CONFIG: ${Developer:OMIM_PATH}/tools/python/maps_generator/var/etc/stats_types_config.txt diff --git a/tools/python/descriptions/descriptions_downloader.py b/tools/python/descriptions/descriptions_downloader.py index b0a7f86bfe..ec76367c8f 100644 --- a/tools/python/descriptions/descriptions_downloader.py +++ b/tools/python/descriptions/descriptions_downloader.py @@ -266,10 +266,9 @@ def download_from_wikidata_tags(input_file, output_dir, langs, checker): wikidata_output_dir = os.path.join(output_dir, "wikidata") os.makedirs(wikidata_output_dir, exist_ok=True) with open(input_file) as file: - pool = ThreadPool(processes=WORKERS) - pool.map(wikidata_worker(wikidata_output_dir, checker, langs), file, CHUNK_SIZE) - pool.close() - pool.join() + with ThreadPool(processes=WORKERS) as pool: + pool.map(wikidata_worker(wikidata_output_dir, checker, langs), + file, CHUNK_SIZE) def check_and_get_checker(popularity_file): diff --git a/tools/python/maps_generator/generator/env.py b/tools/python/maps_generator/generator/env.py index e89e484cd5..cf8d12dbb3 100644 --- a/tools/python/maps_generator/generator/env.py +++ b/tools/python/maps_generator/generator/env.py @@ -17,12 +17,13 @@ from typing import Type from typing import Union from maps_generator.generator import settings +from maps_generator.generator import status from maps_generator.generator.osmtools import build_osmtools from maps_generator.generator.stages import Stage from maps_generator.generator.status import Status from maps_generator.utils.file import find_executable from maps_generator.utils.file import is_executable -from maps_generator.utils.file import symlink_force +from maps_generator.utils.file import make_symlink logger = logging.getLogger("maps_generator") @@ -221,7 +222,7 @@ class PathProvider: @property def main_status_path(self) -> AnyStr: - return os.path.join(self.status_path, "stages.status") + return os.path.join(self.status_path, status.with_stat_ext("stages")) @property def packed_polygons_path(self) -> AnyStr: @@ -348,7 +349,7 @@ class Env: build_name: Optional[AnyStr] = None, build_suffix: AnyStr = "", skipped_stages: Optional[Set[Type[Stage]]] = None, - force_download_files: bool = False + force_download_files: bool = False, ): self.setup_logging() @@ -399,7 +400,12 @@ class Env: self.setup_borders() self.setup_osm2ft() - self.main_status = Status() + if self.force_download_files: + for item in os.listdir(self.paths.status_path): + if item.endswith(".download"): + os.remove(os.path.join(self.paths.status_path, item)) + + self.main_status = status.Status() # self.countries_meta stores log files and statuses for each country. self.countries_meta = collections.defaultdict(dict) self.subprocess_out = None @@ -507,22 +513,14 @@ class Env: def setup_borders(self): temp_borders = self.paths.generation_borders_path - # It is needed in case of rebuilding several mwms. - for filename in os.listdir(temp_borders): - file_path = os.path.join(temp_borders, filename) - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - borders = PathProvider.borders_path() for x in self.countries: if x in WORLDS_NAMES: continue poly = f"{x}.poly" - os.symlink(os.path.join(borders, poly), os.path.join(temp_borders, poly)) - symlink_force(temp_borders, os.path.join(self.paths.draft_path, "borders")) + make_symlink(os.path.join(borders, poly), os.path.join(temp_borders, poly)) + make_symlink(temp_borders, os.path.join(self.paths.draft_path, "borders")) def setup_osm2ft(self): for x in os.listdir(self.paths.osm2ft_path): diff --git a/tools/python/maps_generator/generator/generation.py b/tools/python/maps_generator/generator/generation.py index 1adf7f96f6..ff82023f39 100644 --- a/tools/python/maps_generator/generator/generation.py +++ b/tools/python/maps_generator/generator/generation.py @@ -13,6 +13,7 @@ from maps_generator.generator.stages import Stage from maps_generator.generator.stages import get_stage_name from maps_generator.generator.stages import stages from maps_generator.generator.status import Status +from maps_generator.generator.status import without_stat_ext class Generation: @@ -26,17 +27,24 @@ class Generation: generation.run() """ - def __init__(self, env: Env): + def __init__(self, env: Env, build_lock: bool = True): self.env: Env = env self.stages: List[Stage] = [] self.runnable_stages: Optional[List[Stage]] = None + self.build_lock: bool = build_lock for country_stage in stages.countries_stages: if self.is_skipped_stage(country_stage): self.env.add_skipped_stage(country_stage) + for stage in stages.stages: + if self.is_skipped_stage(stage): + self.env.add_skipped_stage(stage) + def is_skipped_stage(self, stage: Union[Type[Stage], Stage]) -> bool: - return stage.is_production_only and not self.env.production + return ( + stage.is_production_only and not self.env.production + ) or not self.env.is_accepted_stage(stage) def add_stage(self, stage: Stage): self.stages.append(stage) @@ -65,10 +73,16 @@ class Generation: if from_stage is not None: self.reset_to_stage(from_stage) - lock_filename = f"{os.path.join(self.env.paths.build_path, 'lock')}.lock" - with filelock.FileLock(lock_filename): - for stage in self.runnable_stages: - stage(self.env) + if self.build_lock: + lock_filename = f"{os.path.join(self.env.paths.build_path, 'lock')}.lock" + with filelock.FileLock(lock_filename, timeout=1): + self.run_stages() + else: + self.run_stages() + + def run_stages(self): + for stage in self.runnable_stages: + stage(self.env) def reset_to_stage(self, stage_name: AnyStr): """ @@ -77,13 +91,31 @@ class Generation: It supposes that stages have next representation: stage1, ..., stage_mwm[country_stage_1, ..., country_stage_M], ..., stageN """ - countries_statuses_paths = [ - os.path.join(self.env.paths.status_path, f) - for f in os.listdir(self.env.paths.status_path) - if os.path.isfile(os.path.join(self.env.paths.status_path, f)) - and os.path.join(self.env.paths.status_path, f) - != self.env.paths.main_status_path - ] + high_level_stages = [get_stage_name(s) for s in self.runnable_stages] + if not ( + stage_name in high_level_stages + or any(stage_name == get_stage_name(s) for s in stages.countries_stages) + ): + raise ContinueError(f"{stage_name} not in {', '.join(high_level_stages)}.") + + if not os.path.exists(self.env.paths.status_path): + raise ContinueError(f"Status path {self.env.paths.status_path} not found.") + + if not os.path.exists(self.env.paths.main_status_path): + raise ContinueError( + f"Status file {self.env.paths.main_status_path} not found." + ) + + countries_statuses_paths = [] + countries = set(self.env.countries) + for f in os.listdir(self.env.paths.status_path): + full_name = os.path.join(self.env.paths.status_path, f) + if ( + os.path.isfile(full_name) + and full_name != self.env.paths.main_status_path + and without_stat_ext(f) in countries + ): + countries_statuses_paths.append(full_name) def set_countries_stage(st): for path in countries_statuses_paths: @@ -93,26 +125,20 @@ class Generation: for path in countries_statuses_paths: Status(path).finish() - high_level_stages = [get_stage_name(s) for s in self.runnable_stages] - if not ( - stage_name in high_level_stages - or any(stage_name == get_stage_name(s) for s in stages.countries_stages) - ): - raise ContinueError(f"{stage_name} not in {', '.join(high_level_stages)}.") - - if not os.path.exists(self.env.paths.main_status_path): - raise ContinueError( - f"Status file {self.env.paths.main_status_path} not found." - ) - - if not os.path.exists(self.env.paths.status_path): - raise ContinueError(f"Status path {self.env.paths.status_path} not found.") + def index(l: List, val): + try: + return l.index(val) + except ValueError: + return -1 mwm_stage_name = get_stage_name(stages.mwm_stage) - stage_mwm_index = high_level_stages.index(mwm_stage_name) + stage_mwm_index = index(high_level_stages, mwm_stage_name) main_status = None - if stage_name in high_level_stages[: stage_mwm_index + 1]: + if ( + stage_mwm_index == -1 + or stage_name in high_level_stages[: stage_mwm_index + 1] + ): main_status = stage_name set_countries_stage("") elif stage_name in high_level_stages[stage_mwm_index + 1 :]: diff --git a/tools/python/maps_generator/generator/settings.py b/tools/python/maps_generator/generator/settings.py index d3d77c8852..31bf0f4692 100644 --- a/tools/python/maps_generator/generator/settings.py +++ b/tools/python/maps_generator/generator/settings.py @@ -135,8 +135,12 @@ PLANET_COASTS_GEOM_URL = os.path.join(PLANET_COASTS_URL, "latest_coasts.geom") PLANET_COASTS_RAWGEOM_URL = os.path.join(PLANET_COASTS_URL, "latest_coasts.rawgeom") if DEBUG: - PLANET_URL = "http://osmz.ru/mwm/islands/islands.o5m" - PLANET_MD5_URL = "https://cloclo10.cldmail.ru/2n5jWJm11RtdLYm5QFYM/G/QAsr/24zvN9Gf8" + PLANET_URL = "https://www.dropbox.com/s/m3ru5tnj8g9u4cz/planet-latest.o5m?raw=1" + PLANET_MD5_URL = ( + "https://www.dropbox.com/s/8wdl2hy22jgisk5/planet-latest.o5m.md5?raw=1" + ) + NODE_STORAGE = "map" + NEED_PLANET_UPDATE = False # Common: THREADS_COUNT = multiprocessing.cpu_count() @@ -279,5 +283,9 @@ def init(default_settings_path: AnyStr): PLANET_COASTS_RAWGEOM_URL = os.path.join(PLANET_COASTS_URL, "latest_coasts.rawgeom") if DEBUG: - PLANET_URL = "http://osmz.ru/mwm/islands/islands.o5m" - PLANET_MD5_URL = "https://cloclo10.cldmail.ru/2n5jWJm11RtdLYm5QFYM/G/QAsr/24zvN9Gf8" + PLANET_URL = "https://www.dropbox.com/s/m3ru5tnj8g9u4cz/planet-latest.o5m?raw=1" + PLANET_MD5_URL = ( + "https://www.dropbox.com/s/8wdl2hy22jgisk5/planet-latest.o5m.md5?raw=1" + ) + NODE_STORAGE = "map" + NEED_PLANET_UPDATE = False diff --git a/tools/python/maps_generator/generator/stages.py b/tools/python/maps_generator/generator/stages.py index 60c6f161ed..fb0a56302c 100644 --- a/tools/python/maps_generator/generator/stages.py +++ b/tools/python/maps_generator/generator/stages.py @@ -14,15 +14,17 @@ import time from abc import ABC from abc import abstractmethod from collections import defaultdict -from multiprocessing import Lock from typing import AnyStr from typing import Callable +from typing import Dict from typing import List from typing import Optional from typing import Type from typing import Union -from maps_generator.generator.status import Status +import filelock + +from maps_generator.generator import status from maps_generator.utils.file import download_files from maps_generator.utils.file import normalize_url_to_path_dict from maps_generator.utils.log import DummyObject @@ -184,16 +186,18 @@ def country_stage_status(stage: Type[Stage]) -> Type[Stage]: return if "status" not in countries_meta[country]: - countries_meta[country]["status"] = Status() + countries_meta[country]["status"] = status.Status() - status = countries_meta[country]["status"] - status_file = os.path.join(env.paths.status_path, f"{country}.status") - status.init(status_file, name) - if status.need_skip(): + country_status = countries_meta[country]["status"] + status_file = os.path.join( + env.paths.status_path, status.with_stat_ext(country) + ) + country_status.init(status_file, name) + if country_status.need_skip(): _logger.warning(f"{name} was skipped.") return - status.update_status() + country_status.update_status() method(obj, env, country, *args, **kwargs) return apply @@ -261,28 +265,39 @@ def helper_stage_for(*deps) -> Callable[[Type[Stage],], Type[Stage]]: def depends_from_internal(*deps) -> Callable[[Type[Stage],], Type[Stage]]: + def get_urls( + env: "Env", internal_dependencies: List[InternalDependency] + ) -> Dict[AnyStr, AnyStr]: + deps = {} + for d in internal_dependencies: + if "p" in d.mode and not env.production: + continue + + path = None + if type(d.path_method) is property: + path = d.path_method.__get__(env.paths) + + assert path is not None, type(d.path_method) + deps[d.url] = path + + return deps + + def download_under_lock(env: "Env", urls: Dict[AnyStr, AnyStr], stage_name: AnyStr): + lock_name = f"{os.path.join(env.paths.status_path, stage_name)}.lock" + status_name = f"{os.path.join(env.paths.status_path, stage_name)}.download" + with filelock.FileLock(lock_name): + s = status.Status(status_name) + if not s.is_finished(): + urls = normalize_url_to_path_dict(urls) + download_files(urls, env.force_download_files) + s.finish() + def new_apply(method): def apply(obj: Stage, env: "Env", *args, **kwargs): if hasattr(obj, "internal_dependencies") and obj.internal_dependencies: - with obj.depends_from_internal_lock: - if not obj.depends_from_internal_downloaded: - deps = {} - for d in obj.internal_dependencies: - if "p" in d.mode and not env.production: - continue - - path = None - if type(d.path_method) is property: - path = d.path_method.__get__(env.paths) - - assert path is not None, type(d.path_method) - deps[d.url] = path - - if deps: - deps = normalize_url_to_path_dict(deps) - download_files(deps, env.force_download_files) - - obj.depends_from_internal_downloaded = True + urls = get_urls(env, obj.internal_dependencies) + if urls: + download_under_lock(env, urls, get_stage_name(obj)) method(obj, env, *args, **kwargs) @@ -290,9 +305,7 @@ def depends_from_internal(*deps) -> Callable[[Type[Stage],], Type[Stage]]: def wrapper(stage: Type[Stage]) -> Type[Stage]: stage.internal_dependencies = deps - stage.depends_from_internal_lock = Lock() - stage.depends_from_internal_downloaded = False stage.apply = new_apply(stage.apply) return stage - return wrapper + return wrapper \ No newline at end of file diff --git a/tools/python/maps_generator/generator/stages_declaration.py b/tools/python/maps_generator/generator/stages_declaration.py index 47128fa15b..cf74b8dcfd 100644 --- a/tools/python/maps_generator/generator/stages_declaration.py +++ b/tools/python/maps_generator/generator/stages_declaration.py @@ -278,7 +278,11 @@ class StageMwmStatistics(Stage): @outer_stage @depends_from_internal( - D(settings.PROMO_CATALOG_COUNTRIES_URL, PathProvider.promo_catalog_countries_path, "p") + D( + settings.PROMO_CATALOG_COUNTRIES_URL, + PathProvider.promo_catalog_countries_path, + "p", + ) ) class StageCountriesTxt(Stage): def apply(self, env: Env): @@ -356,7 +360,7 @@ class StageStatistics(Stage): with open(os.path.join(env.paths.stats_path, f"{country}.json")) as f: stats["countries"][country] = { "types": json.load(f), - "steps": steps_info["countries"][country] + "steps": steps_info["countries"][country], } def default(o): @@ -373,7 +377,7 @@ class StageStatistics(Stage): class StageCleanup(Stage): def apply(self, env: Env): logger.info( - f"osm2ft files will be moved from {env.paths.build_path} " + f"osm2ft files will be moved from {env.paths.mwm_path} " f"to {env.paths.osm2ft_path}." ) for x in os.listdir(env.paths.mwm_path): diff --git a/tools/python/maps_generator/generator/status.py b/tools/python/maps_generator/generator/status.py index 2bd6c627ca..6dbd84c97b 100644 --- a/tools/python/maps_generator/generator/status.py +++ b/tools/python/maps_generator/generator/status.py @@ -1,11 +1,22 @@ import os from typing import AnyStr +from typing import Optional + + +def with_stat_ext(country: AnyStr): + return f"{country}.status" + + +def without_stat_ext(status: AnyStr): + return status.replace(".status", "") class Status: """Status is used for recovering and continuation maps generation.""" - def __init__(self, stat_path: AnyStr = None, stat_next: AnyStr = None): + def __init__( + self, stat_path: Optional[AnyStr] = None, stat_next: Optional[AnyStr] = None + ): self.stat_path = stat_path self.stat_next = stat_next self.stat_saved = None @@ -14,9 +25,7 @@ class Status: def init(self, stat_path: AnyStr, stat_next: AnyStr): self.stat_path = stat_path self.stat_next = stat_next - if os.path.exists(self.stat_path) and os.path.isfile(self.stat_path): - with open(self.stat_path) as status: - self.stat_saved = status.read() + self.stat_saved = self.status() if not self.find: self.find = not self.stat_saved or not self.need_skip() @@ -32,3 +41,13 @@ class Status: def finish(self): with open(self.stat_path, "w") as status: status.write("finish") + + def is_finished(self): + return self.status() == "finish" + + def status(self): + try: + with open(self.stat_path) as status: + return status.read() + except IOError: + return None diff --git a/tools/python/maps_generator/generator/steps.py b/tools/python/maps_generator/generator/steps.py index 3e1761a792..4b74a88dd0 100644 --- a/tools/python/maps_generator/generator/steps.py +++ b/tools/python/maps_generator/generator/steps.py @@ -22,7 +22,7 @@ from maps_generator.generator.osmtools import osmupdate from maps_generator.generator.statistics import make_stats from maps_generator.utils.file import download_files from maps_generator.utils.file import is_verified -from maps_generator.utils.file import symlink_force +from maps_generator.utils.file import make_symlink from maps_generator.utils.md5 import md5_ext from maps_generator.utils.md5 import write_md5sum @@ -141,7 +141,7 @@ def run_gen_tool_with_recovery_country(env: Env, *args, **kwargs): mwm = f"{kwargs['output']}.mwm" osm2ft = f"{mwm}.osm2ft" kwargs["data_path"] = env.paths.draft_path - symlink_force( + make_symlink( os.path.join(prev_data_path, osm2ft), os.path.join(env.paths.draft_path, osm2ft) ) shutil.copy( @@ -338,8 +338,7 @@ def step_statistics(env: Env, country: AnyStr, **kwargs): json.dump( make_stats( settings.STATS_TYPES_CONFIG, - os.path.join(env.paths.intermediate_data_path, - f"{country}.stats") + os.path.join(env.paths.intermediate_data_path, f"{country}.stats"), ), - f + f, ) diff --git a/tools/python/maps_generator/maps_generator.py b/tools/python/maps_generator/maps_generator.py index 2ace39fb72..7fd9656af9 100644 --- a/tools/python/maps_generator/maps_generator.py +++ b/tools/python/maps_generator/maps_generator.py @@ -1,16 +1,23 @@ import logging from typing import AnyStr +from typing import Iterable from typing import Optional from maps_generator.generator import stages_declaration as sd from maps_generator.generator.env import Env from maps_generator.generator.generation import Generation +from .generator.stages import Stage logger = logging.getLogger("maps_generator") -def run_generation(env, stages, from_stage: Optional[AnyStr] = None): - generation = Generation(env) +def run_generation( + env: Env, + stages: Iterable[Stage], + from_stage: Optional[AnyStr] = None, + build_lock: bool = True, +): + generation = Generation(env, build_lock) for s in stages: generation.add_stage(s) @@ -19,7 +26,7 @@ def run_generation(env, stages, from_stage: Optional[AnyStr] = None): def generate_maps(env: Env, from_stage: Optional[AnyStr] = None): """"Runs maps generation.""" - stages = [ + stages = ( sd.StageDownloadAndConvertPlanet(), sd.StageUpdatePlanet(), sd.StageCoastline(), @@ -32,18 +39,18 @@ def generate_maps(env: Env, from_stage: Optional[AnyStr] = None): sd.StageLocalAds(), sd.StageStatistics(), sd.StageCleanup(), - ] + ) run_generation(env, stages, from_stage) def generate_coasts(env: Env, from_stage: Optional[AnyStr] = None): """Runs coasts generation.""" - stages = [ + stages = ( sd.StageDownloadAndConvertPlanet(), sd.StageUpdatePlanet(), sd.StageCoastline(use_old_if_fail=False), sd.StageCleanup(), - ] + ) run_generation(env, stages, from_stage) diff --git a/tools/python/maps_generator/utils/file.py b/tools/python/maps_generator/utils/file.py index 84d8c14406..8cedf97721 100644 --- a/tools/python/maps_generator/utils/file.py +++ b/tools/python/maps_generator/utils/file.py @@ -144,12 +144,16 @@ def copy_overwrite(from_path: AnyStr, to_path: AnyStr): shutil.copytree(from_path, to_path) -def symlink_force(target: AnyStr, link_name: AnyStr): +def make_symlink(target: AnyStr, link_name: AnyStr): try: os.symlink(target, link_name) except OSError as e: if e.errno == errno.EEXIST: - os.remove(link_name) - os.symlink(target, link_name) + if os.path.islink(link_name): + link = os.readlink(link_name) + if os.path.abspath(target) != os.path.abspath(link): + raise e + else: + raise e else: raise e