[python][generator] Several implementations of mwm python lib.

This commit is contained in:
Maksim Andrianov 2020-03-20 03:35:27 +03:00 committed by mpimenov
parent bbe2aa8c0a
commit 80a99fdccb
14 changed files with 1461 additions and 712 deletions

View file

@ -0,0 +1,25 @@
import os
if "MWM_RESOURCES_DIR" not in os.environ:
os.environ["MWM_RESOURCES_DIR"] = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "..", "..", "..", "data",
)
try:
from mwm.mwm_pygen import MwmPygen as Mwm
from mwm.mwm_pygen import FeaturePygen as Feature
except ImportError:
from mwm.mwm_native import MwmNative as Mwm
from mwm.mwm_native import FeatureNative as Feature
from mwm.mwm_interface import GeomType
from mwm.mwm_interface import MapType
from mwm.mwm_interface import MetadataField
from mwm.mwm_interface import Point
from mwm.mwm_interface import Rect
from mwm.mwm_interface import RegionDataField
from mwm.mwm_interface import Triangle
from mwm.mwm_native import get_crossmwm
from mwm.mwm_native import get_region_info
from mwm.types import readable_type
from mwm.utils import EnumAsStrEncoder

View file

@ -1,11 +1,15 @@
import argparse
import logging
import sys
from .decode_id import decode_id
from .dump_mwm import dump_mwm
from .find_feature import find_feature
from .ft2osm import ft2osm
from .mwm_feature_compare import compare_mwm
from mwm.decode_id import decode_id
from mwm.dump_mwm import dump_mwm
from mwm.find_feature import find_and_print_features
from mwm.ft2osm import ft2osm
from mwm.mwm_feature_compare import compare_mwm
logger = logging.getLogger("mwm")
logger.setLevel(logging.ERROR)
class Mwm:
@ -19,7 +23,8 @@ The most commonly used mwm commands are:
find_feature Finds features in an mwm file based on a query.
ft2osm Finds an OSM object for a given feature id.
mwm_feature_compare Compares feature count in .mwm files.
""")
""",
)
parser.add_argument("command", help="Subcommand to run")
args = parser.parse_args(sys.argv[1:2])
if not hasattr(self, args.command):
@ -31,9 +36,11 @@ The most commonly used mwm commands are:
@staticmethod
def decode_id():
parser = argparse.ArgumentParser(
description="Unpacks maps.me OSM id to an OSM object link.")
parser.add_argument("--id", type=str, required=True,
help="OsmId or url from osm.org.")
description="Unpacks maps.me OSM id to an OSM object link."
)
parser.add_argument(
"--id", type=str, required=True, help="OsmId or url from osm.org."
)
args = parser.parse_args(sys.argv[2:])
id = decode_id(args.id)
if id is None:
@ -43,43 +50,54 @@ The most commonly used mwm commands are:
@staticmethod
def dump_mwm():
parser = argparse.ArgumentParser(
description="Dumps some MWM structures.")
parser.add_argument("--path", type=str, required=True,
help="Path to mwm.")
parser.add_argument("--format", type=str, default="meta",
choices=("meta", "features", "tags"),
help="Output format.")
parser = argparse.ArgumentParser(description="Dumps some MWM structures.")
parser.add_argument("--path", type=str, required=True, help="Path to mwm.")
parser.add_argument(
"--format",
type=str,
default="str",
choices=("str", "json"),
help="Output format.",
)
parser.add_argument(
"--need_features", action="store_true", help="Need to dump features."
)
args = parser.parse_args(sys.argv[2:])
dump_mwm(args.path, args.format)
dump_mwm(args.path, args.format, args.need_features)
@staticmethod
def find_feature():
parser = argparse.ArgumentParser(
description="Finds features in an mwm file based on a query.")
parser.add_argument("--path", type=str, required=True,
help="Path to mwm.")
parser.add_argument("--type", type=str, required=True,
choices=["t", "et", "n", "m", "id"],
help='''Type:
description="Finds features in an mwm file based on a query."
)
parser.add_argument("--path", type=str, required=True, help="Path to mwm.")
parser.add_argument(
"--type",
type=str,
required=True,
choices=["t", "et", "n", "m", "id"],
help="""Type:
t for inside types ("t hwtag" will find all hwtags-*)
et for exact type ("et shop" won\'t find shop-chemist)
n for names, case-sensitive ("n Starbucks" fo r all starbucks)
m for metadata keys ("m flats" for features with flats
id for feature id ("id 1234" for feature #1234''')
parser.add_argument("--str", type=str, required=True,
help="String to find in mwm")
id for feature id ("id 1234" for feature #1234""",
)
parser.add_argument(
"--str", type=str, required=True, help="String to find in mwm"
)
args = parser.parse_args(sys.argv[2:])
find_feature(args.path, args.type, args.str)
find_and_print_features(args.path, args.type, args.str)
@staticmethod
def ft2osm():
parser = argparse.ArgumentParser(
description="Finds features in an mwm file based on a query.")
parser.add_argument("--path", type=str, required=True,
help="Path to osm to feature mapping.")
parser.add_argument("--id", type=str, required=True,
help="Feature id.")
description="Finds features in an mwm file based on a query."
)
parser.add_argument(
"--path", type=str, required=True, help="Path to osm to feature mapping."
)
parser.add_argument("--id", type=str, required=True, help="Feature id.")
args = parser.parse_args(sys.argv[2:])
id = ft2osm(args.path, args.id)
if id is None:
@ -90,23 +108,27 @@ The most commonly used mwm commands are:
@staticmethod
def mwm_feature_compare():
parser = argparse.ArgumentParser(
description="Compares feature count in .mwm files.")
parser.add_argument("-n", "--new", help="New mwm files path",
type=str, required=True)
parser.add_argument("-o", "--old", help="Old mwm files path",
type=str, required=True)
parser.add_argument("-f", "--feature", help="Feature name to count",
type=str, required=True)
parser.add_argument("-t", "--threshold",
help="Threshold in percent to warn", type=int,
default=20)
description="Compares feature count in .mwm files."
)
parser.add_argument(
"-n", "--new", help="New mwm files path", type=str, required=True
)
parser.add_argument(
"-o", "--old", help="Old mwm files path", type=str, required=True
)
parser.add_argument(
"-f", "--feature", help="Feature name to count", type=str, required=True
)
parser.add_argument(
"-t",
"--threshold",
help="Threshold in percent to warn",
type=int,
default=20,
)
args = parser.parse_args()
if not compare_mwm(args.old, args.new, args.feature,
args.threshold):
print(
"Warning: some .mwm files lost more than {}% booking hotels".format(
args.threshold))
args = parser.parse_args(sys.argv[2:])
compare_mwm(args.old, args.new, args.feature, args.threshold)
Mwm()

View file

@ -1,11 +1,12 @@
import re
from . import mwm
from mwm.ft2osm import OsmIdCode
from mwm.ft2osm import unpack_osmid
def decode_id(id):
if id.isdigit():
osm_id = mwm.unpack_osmid(int(id))
osm_id = unpack_osmid(int(id))
type_abbr = {"n": "node", "w": "way", "r": "relation"}
return f"https://www.openstreetmap.org/{type_abbr[osm_id[0]]}/{osm_id[1]}"
else:
@ -13,11 +14,11 @@ def decode_id(id):
if m:
oid = int(m.group(2))
if m.group(1) == "node":
oid |= mwm.OsmIdCode.NODE
oid |= OsmIdCode.NODE
elif m.group(1) == "way":
oid |= mwm.OsmIdCode.WAY
oid |= OsmIdCode.WAY
elif m.group(1) == "relation":
oid |= mwm.OsmIdCode.RELATION
oid |= OsmIdCode.RELATION
return oid
else:
return None

View file

@ -1,35 +1,21 @@
import json
import os.path
import sys
from .mwm import MWM
from mwm import EnumAsStrEncoder
from mwm import Mwm
def dump_mwm(path, format):
mwm = MWM(open(path, "rb"))
mwm.read_types(os.path.join(os.path.dirname(sys.argv[0]),
"..", "..", "..", "data", "types.txt"))
header = mwm.read_header()
def dump_mwm(path, format, need_features):
mwm = Mwm(path)
if format == "str":
print(mwm)
elif format == "json":
print(json.dumps(mwm.to_json(), ensure_ascii=False, cls=EnumAsStrEncoder))
if format == "meta" or format == "tags":
print("Tags:")
tvv = sorted([(k, v[0], v[1]) for k, v in mwm.tags.items()], key=lambda x: x[1])
for tv in tvv:
print(" {0:<8}: offs {1:9} len {2:8}".format(tv[0], tv[1], tv[2]))
if format == "meta":
v = mwm.read_version()
print("Format: {0}, version: {1}".format(v["fmt"], v["date"].strftime("%Y-%m-%d %H:%M")))
print("Header: {0}".format(header))
print("Region Info: {0}".format(mwm.read_region_info()))
print("Metadata count: {0}".format(len(mwm.read_metadata())))
print("Feature count: {0}".format(len(list(mwm.iter_features()))))
cross = mwm.read_crossmwm()
if cross:
print("Outgoing points: {0}, incoming: {1}".format(len(cross["out"]), len(cross["in"])))
print("Outgoing regions: {0}".format(set(cross["neighbours"])))
elif format == "features":
fts = list(mwm.iter_features())
print("Features:")
for ft in fts:
print(json.dumps(ft, ensure_ascii=False))
if need_features:
for ft in mwm:
if format == "str":
print(ft)
elif format == "json":
print(
json.dumps(ft.to_json(), ensure_ascii=False, cls=EnumAsStrEncoder)
)

View file

@ -1,33 +1,51 @@
import json
import os.path
from typing import List
from .mwm import MWM
from mwm import EnumAsStrEncoder
from mwm import Feature
from mwm import Mwm
from mwm import readable_type
def find_feature(path, typ, string):
mwm = MWM(open(path, "rb"))
mwm.read_header()
mwm.read_types(os.path.join(os.path.dirname(__file__),
"..", "..", "..", "data", "types.txt"))
parse_metadata = typ == "m"
for i, feature in enumerate(mwm.iter_features(metadata=parse_metadata)):
def find_features(path: str, typ: str, string: str) -> List[Feature]:
features = []
for feature in Mwm(path):
found = False
if typ == "n" and "name" in feature["header"]:
for value in feature["header"]["name"].values():
if typ == "n":
for value in feature.names().values():
if string in value:
found = True
break
elif typ in ("t", "et"):
for t in feature["header"]["types"]:
if t == string:
for t in feature.types():
readable_type_ = readable_type(t)
if readable_type_ == string:
found = True
elif typ == "t" and string in t:
break
elif typ == "t" and string in readable_type_:
found = True
elif typ == "m" and "metadata" in feature:
if string in feature["metadata"]:
found = True
elif typ == "id" and i == int(string):
break
elif typ == "m":
for f in feature.metadata():
if string in f.name:
found = True
break
elif typ == "id" and int(string) == feature.index():
found = True
if found:
print(json.dumps(feature, ensure_ascii=False,
sort_keys=True).encode("utf-8"))
features.append(feature)
return features
def find_and_print_features(path: str, typ: str, string: str):
for feature in find_features(path, typ, string):
print(
json.dumps(
feature.to_json(),
ensure_ascii=False,
sort_keys=True,
cls=EnumAsStrEncoder,
)
)

View file

@ -1,9 +1,100 @@
from . import mwm
from mwm.mwm_native import read_uint
from mwm.mwm_native import read_varuint
class OsmIdCode:
NODE = 0x4000000000000000
WAY = 0x8000000000000000
RELATION = 0xC000000000000000
RESET = ~(NODE | WAY | RELATION)
@staticmethod
def is_node(code):
return code & OsmIdCode.NODE == OsmIdCode.NODE
@staticmethod
def is_way(code):
return code & OsmIdCode.WAY == OsmIdCode.WAY
@staticmethod
def is_relation(code):
return code & OsmIdCode.RELATION == OsmIdCode.RELATION
@staticmethod
def get_type(code):
if OsmIdCode.is_relation(code):
return "r"
elif OsmIdCode.is_node(code):
return "n"
elif OsmIdCode.is_way(code):
return "w"
return None
@staticmethod
def get_id(code):
return code & OsmIdCode.RESET
def unpack_osmid(num):
typ = OsmIdCode.get_type(num)
if typ is None:
return None
return typ, OsmIdCode.get_id(num)
def _read_osm2ft_v0(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
def _read_osm2ft_v1(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
def read_osm2ft(f, ft2osm=False, tuples=True):
"""Reads mwm.osm2ft file, returning a dict of feature id <-> osm id."""
header = read_uint(f, 4)
is_new_format = header == 0xFFFFFFFF
if is_new_format:
version = read_uint(f, 1)
if version == 1:
return _read_osm2ft_v1(f, ft2osm, tuples)
else:
raise Exception("Format {0} is not supported".format(version))
else:
f.seek(0)
return _read_osm2ft_v0(f, ft2osm, tuples)
def ft2osm(path, ftid):
with open(path, "rb") as f:
ft2osm = mwm.read_osm2ft(f, ft2osm=True)
ft2osm = read_osm2ft(f, ft2osm=True)
type_abbr = {"n": "node", "w": "way", "r": "relation"}
ftid = int(ftid)

View file

@ -1,571 +0,0 @@
# MWM Reader Module
import struct
from datetime import datetime
import math
# Unprocessed sections: geomN, trgN, idx, sdx (search index), addr (search address), offs (feature offsets - succinct)
# Routing sections: mercedes (matrix), daewoo (edge data), infinity (edge id), skoda (shortcuts), chrysler (cross context), ftseg, node2ftseg
# (these mostly are succinct structures, except chrysler and node2ftseg, so no use trying to load them here)
# TODO:
# - Predictive reading of LineStrings
# - Find why polygon geometry is incorrect in iter_features()
# - Find feature ids in the 'dat' section, or find a way to read the 'offs' section
class OsmIdCode:
NODE = 0x4000000000000000
WAY = 0x8000000000000000
RELATION = 0xC000000000000000
RESET = ~(NODE | WAY | RELATION)
@staticmethod
def is_node(code):
return code & OsmIdCode.NODE == OsmIdCode.NODE
@staticmethod
def is_way(code):
return code & OsmIdCode.WAY == OsmIdCode.WAY
@staticmethod
def is_relation(code):
return code & OsmIdCode.RELATION == OsmIdCode.RELATION
@staticmethod
def get_type(code):
if OsmIdCode.is_relation(code):
return 'r'
elif OsmIdCode.is_node(code):
return 'n'
elif OsmIdCode.is_way(code):
return 'w'
return None
@staticmethod
def get_id(code):
return code & OsmIdCode.RESET
class MWM:
# coding/string_utf8_multilang.cpp
languages = ["default",
"en", "ja", "fr", "ko_rm", "ar", "de", "int_name", "ru", "sv", "zh", "fi", "be", "ka", "ko",
"he", "nl", "ga", "ja_rm", "el", "it", "es", "zh_pinyin", "th", "cy", "sr", "uk", "ca", "hu",
"hsb", "eu", "fa", "br", "pl", "hy", "kn", "sl", "ro", "sq", "am", "fy", "cs", "gd", "sk",
"af", "ja_kana", "lb", "pt", "hr", "fur", "vi", "tr", "bg", "eo", "lt", "la", "kk", "gsw",
"et", "ku", "mn", "mk", "lv", "hi"]
# indexer/feature_meta.hpp
metadata = ["0",
"cuisine", "open_hours", "phone_number", "fax_number", "stars",
"operator", "url", "website", "internet", "ele",
"turn_lanes", "turn_lanes_forward", "turn_lanes_backward", "email", "postcode",
"wikipedia", "maxspeed", "flats", "height", "min_height",
"denomination", "building_levels", "test_id", "ref:sponsored", "price_rate",
"rating", "banner_url", "level", "iata", "brand"]
regiondata = ["languages", "driving", "timezone", "addr_fmt", "phone_fmt", "postcode_fmt", "holidays", "housenames"]
def __init__(self, f):
self.f = f
self.coord_size = None
self.base_point = (0, 0)
self.read_info()
self.type_mapping = []
def read_types(self, filename):
with open(filename, 'r') as ft:
for line in ft:
if len(line.strip()) > 0:
self.type_mapping.append(line.strip().replace('|', '-'))
def read_info(self):
self.f.seek(0)
self.f.seek(self.read_uint(8))
cnt = self.read_varuint()
self.tags = {}
for i in range(cnt):
name = self.read_string(plain=True)
offset = self.read_varuint()
length = self.read_varuint()
self.tags[name] = (offset, length)
def has_tag(self, tag):
return tag in self.tags and self.tags[tag][1] > 0
def seek_tag(self, tag):
self.f.seek(self.tags[tag][0])
def tag_offset(self, tag):
return self.f.tell() - self.tags[tag][0]
def inside_tag(self, tag):
pos = self.tag_offset(tag)
return pos >= 0 and pos < self.tags[tag][1]
def read_version(self):
"""Reads 'version' section."""
self.seek_tag('version')
self.f.read(4) # skip prolog
fmt = self.read_varuint() + 1
version = self.read_varuint()
if version < 161231:
vdate = datetime(2000 + int(version / 10000), int(version / 100) % 100, version % 100)
else:
vdate = datetime.fromtimestamp(version)
version = int(vdate.strftime('%y%m%d'))
return {'fmt': fmt, 'version': version, 'date': vdate}
def read_header(self):
"""Reads 'header' section."""
if not self.has_tag('header'):
# Stub for routing files
self.coord_size = (1 << 30) - 1
return {}
self.seek_tag('header')
result = {}
coord_bits = self.read_varuint()
self.coord_size = (1 << coord_bits) - 1
self.base_point = mwm_bitwise_split(self.read_varuint())
result['basePoint'] = self.to_4326(self.base_point)
result['bounds'] = self.read_bounds()
result['scales'] = self.read_uint_array()
langs = self.read_uint_array()
for i in range(len(langs)):
if i < len(self.languages):
langs[i] = self.languages[langs[i]]
result['langs'] = langs
map_type = self.read_varint()
if map_type == 0:
result['mapType'] = 'world'
elif map_type == 1:
result['mapType'] = 'worldcoasts'
elif map_type == 2:
result['mapType'] = 'country'
else:
result['mapType'] = 'unknown: {0}'.format(map_type)
return result
# COMPLEX READERS
def read_region_info(self):
if not self.has_tag('rgninfo'):
return {}
fields = {}
self.seek_tag('rgninfo')
sz = self.read_varuint()
if sz:
for i in range(sz):
t = self.read_varuint()
t = self.regiondata[t] if t < len(self.regiondata) else str(t)
fields[t] = self.read_string()
if t == 'languages':
fields[t] = [self.languages[ord(x)] for x in fields[t]]
return fields
def read_metadata(self):
"""Reads 'meta' and 'metaidx' sections."""
if not self.has_tag('metaidx'):
return {}
# Metadata format is different since v8
fmt = self.read_version()['fmt']
# First, read metaidx, to match featureId <-> metadata
self.seek_tag('metaidx')
ftid_meta = []
while self.inside_tag('metaidx'):
ftid = self.read_uint(4)
moffs = self.read_uint(4)
ftid_meta.append((moffs, ftid))
# Sort ftid_meta array
ftid_meta.sort(key=lambda x: x[0])
ftpos = 0
# Now read metadata
self.seek_tag('meta')
metadatar = {}
while self.inside_tag('meta'):
tag_pos = self.tag_offset('meta')
fields = {}
if fmt >= 8:
sz = self.read_varuint()
if sz:
for i in range(sz):
t = self.read_varuint()
t = self.metadata[t] if t < len(self.metadata) else str(t)
fields[t] = self.read_string()
if t == 'fuel':
fields[t] = fields[t].split('\x01')
else:
while True:
t = self.read_uint(1)
is_last = t & 0x80 > 0
t = t & 0x7f
t = self.metadata[t] if t < len(self.metadata) else str(t)
l = self.read_uint(1)
fields[t] = self.f.read(l).decode('utf-8')
if is_last:
break
if len(fields):
while ftpos < len(ftid_meta) and ftid_meta[ftpos][0] < tag_pos:
ftpos += 1
if ftpos < len(ftid_meta):
if ftid_meta[ftpos][0] == tag_pos:
metadatar[ftid_meta[ftpos][1]] = fields
return metadatar
def read_crossmwm(self):
"""Reads 'chrysler' section (cross-mwm routing table)."""
if not self.has_tag('chrysler'):
return {}
self.seek_tag('chrysler')
# Ingoing nodes: array of (nodeId, coord) tuples
incomingCount = self.read_uint(4)
incoming = []
for i in range(incomingCount):
nodeId = self.read_uint(4)
point = self.read_coord(False)
incoming.append((nodeId, point))
# Outgoing nodes: array of (nodeId, coord, outIndex) tuples
# outIndex is an index in neighbours array
outgoingCount = self.read_uint(4)
outgoing = []
for i in range(outgoingCount):
nodeId = self.read_uint(4)
point = self.read_coord(False)
outIndex = self.read_uint(1)
outgoing.append((nodeId, point, outIndex))
# Adjacency matrix: costs of routes for each (incoming, outgoing) tuple
matrix = []
for i in range(incomingCount):
sub = []
for j in range(outgoingCount):
sub.append(self.read_uint(4))
matrix.append(sub)
# List of mwms to which leads each outgoing node
neighboursCount = self.read_uint(4)
neighbours = []
for i in range(neighboursCount):
size = self.read_uint(4)
neighbours.append(self.f.read(size).decode('utf-8'))
return { 'in': incoming, 'out': outgoing, 'matrix': matrix, 'neighbours': neighbours }
class GeomType:
POINT = 0
LINE = 1 << 5
AREA = 1 << 6
POINT_EX = 3 << 5
def iter_features(self, metadata=False):
"""Reads 'dat' section."""
if not self.has_tag('dat'):
return
# TODO: read 'offs'?
md = {}
if metadata:
md = self.read_metadata()
self.seek_tag('dat')
ftid = -1
while self.inside_tag('dat'):
ftid += 1
feature = {'id': ftid}
feature_size = self.read_varuint()
next_feature = self.f.tell() + feature_size
feature['size'] = feature_size
# Header
header = {}
header_bits = self.read_uint(1)
types_count = (header_bits & 0x07) + 1
has_name = header_bits & 0x08 > 0
has_layer = header_bits & 0x10 > 0
has_addinfo = header_bits & 0x80 > 0
geom_type = header_bits & 0x60
types = []
for i in range(types_count):
type_id = self.read_varuint()
if type_id < len(self.type_mapping):
types.append(self.type_mapping[type_id])
else:
types.append(str(type_id + 1)) # So the numbers match with mapcss-mapping.csv
header['types'] = types
if has_name:
header['name'] = self.read_multilang()
if has_layer:
header['layer'] = self.read_uint(1)
if has_addinfo:
if geom_type == MWM.GeomType.POINT:
header['rank'] = self.read_uint(1)
elif geom_type == MWM.GeomType.LINE:
header['ref'] = self.read_string()
elif geom_type == MWM.GeomType.AREA or geom_type == MWM.GeomType.POINT_EX:
header['house'] = self.read_numeric_string()
feature['header'] = header
# Metadata
if ftid in md:
feature['metadata'] = md[ftid]
# Geometry
geometry = {}
if geom_type == MWM.GeomType.POINT or geom_type == MWM.GeomType.POINT_EX:
geometry['type'] = 'Point'
elif geom_type == MWM.GeomType.LINE:
geometry['type'] = 'LineString'
elif geom_type == MWM.GeomType.AREA:
geometry['type'] = 'Polygon'
if geom_type == MWM.GeomType.POINT:
geometry['coordinates'] = list(self.read_coord())
# (flipping table emoticon)
feature['geometry'] = geometry
if False:
if geom_type != MWM.GeomType.POINT:
polygon_count = self.read_varuint()
polygons = []
for i in range(polygon_count):
count = self.read_varuint()
buf = self.f.read(count)
# TODO: decode
geometry['coordinates'] = polygons
feature['coastCell'] = self.read_varint()
# OSM IDs
count = self.read_varuint()
osmids = []
for i in range(count):
encid = self.read_uint(8)
osmids.append('{0}{1}'.format(
OsmIdCode.get_type(encid) or '',
OsmIdCode.get_id(encid)
))
feature['osmIds'] = osmids
if self.f.tell() > next_feature:
raise Exception('Feature parsing error, read too much')
yield feature
self.f.seek(next_feature)
# BITWISE READERS
def read_uint(self, bytelen=1):
return read_uint(self.f, bytelen)
def read_varuint(self):
return read_varuint(self.f)
def read_varint(self):
return read_varint(self.f)
def read_point(self, ref, packed=True):
"""Reads an unsigned point, returns (x, y)."""
if packed:
u = self.read_varuint()
else:
u = self.read_uint(8)
return mwm_decode_delta(u, ref)
def to_4326(self, point):
"""Convert a point in maps.me-mercator CS to WGS-84 (EPSG:4326)."""
if self.coord_size is None:
raise Exception('Call read_header() first.')
merc_bounds = (-180.0, -180.0, 180.0, 180.0) # Xmin, Ymin, Xmax, Ymax
x = point[0] * (merc_bounds[2] - merc_bounds[0]) / self.coord_size + merc_bounds[0]
y = point[1] * (merc_bounds[3] - merc_bounds[1]) / self.coord_size + merc_bounds[1]
y = 360.0 * math.atan(math.tanh(y * math.pi / 360.0)) / math.pi
return (x, y)
def read_coord(self, packed=True):
"""Reads a pair of coords in degrees mercator, returns (lon, lat)."""
point = self.read_point(self.base_point, packed)
return self.to_4326(point)
def read_bounds(self):
"""Reads mercator bounds, returns (min_lon, min_lat, max_lon, max_lat)."""
rmin = mwm_bitwise_split(self.read_varint())
rmax = mwm_bitwise_split(self.read_varint())
pmin = self.to_4326(rmin)
pmax = self.to_4326(rmax)
return (pmin[0], pmin[1], pmax[0], pmax[1])
def read_string(self, plain=False, decode=True):
length = self.read_varuint() + (0 if plain else 1)
s = self.f.read(length)
return s.decode('utf-8') if decode else s
def read_uint_array(self):
length = self.read_varuint()
result = []
for i in range(length):
result.append(self.read_varuint())
return result
def read_numeric_string(self):
sz = self.read_varuint()
if sz & 1 != 0:
return str(sz >> 1)
sz = (sz >> 1) + 1
return self.f.read(sz).decode('utf-8')
def read_multilang(self):
def find_multilang_next(s, i):
i += 1
while i < len(s):
try:
c = ord(s[i])
except:
c = s[i]
if c & 0xC0 == 0x80:
break
if c & 0x80 == 0:
pass
elif c & 0xFE == 0xFE:
i += 6
elif c & 0xFC == 0xFC:
i += 5
elif c & 0xF8 == 0xF8:
i += 4
elif c & 0xF0 == 0xF0:
i += 3
elif c & 0xE0 == 0xE0:
i += 2
elif c & 0xC0 == 0xC0:
i += 1
i += 1
return i
s = self.read_string(decode=False)
langs = {}
i = 0
while i < len(s):
n = find_multilang_next(s, i)
try:
lng = ord(s[i]) & 0x3F
except TypeError:
lng = s[i] & 0x3F
if lng < len(self.languages):
langs[self.languages[lng]] = s[i+1:n].decode('utf-8')
i = n
return langs
def mwm_unshuffle(x):
x = ((x & 0x22222222) << 1) | ((x >> 1) & 0x22222222) | (x & 0x99999999)
x = ((x & 0x0C0C0C0C) << 2) | ((x >> 2) & 0x0C0C0C0C) | (x & 0xC3C3C3C3)
x = ((x & 0x00F000F0) << 4) | ((x >> 4) & 0x00F000F0) | (x & 0xF00FF00F)
x = ((x & 0x0000FF00) << 8) | ((x >> 8) & 0x0000FF00) | (x & 0xFF0000FF)
return x
def mwm_bitwise_split(v):
hi = mwm_unshuffle(v >> 32)
lo = mwm_unshuffle(v & 0xFFFFFFFF)
x = ((hi & 0xFFFF) << 16) | (lo & 0xFFFF)
y = (hi & 0xFFFF0000) | (lo >> 16)
return (x, y)
def mwm_decode_delta(v, ref):
x, y = mwm_bitwise_split(v)
return ref[0] + zigzag_decode(x), ref[1] + zigzag_decode(y)
def read_uint(f, bytelen=1):
if bytelen == 1:
fmt = 'B'
elif bytelen == 2:
fmt = 'H'
elif bytelen == 4:
fmt = 'I'
elif bytelen == 8:
fmt = 'Q'
else:
raise Exception('Bytelen {0} is not supported'.format(bytelen))
res = struct.unpack(fmt, f.read(bytelen))
return res[0]
def read_varuint(f):
res = 0
shift = 0
more = True
while more:
b = f.read(1)
if not b:
return res
try:
bc = ord(b)
except TypeError:
bc = b
res |= (bc & 0x7F) << shift
shift += 7
more = bc >= 0x80
return res
def zigzag_decode(uint):
res = uint >> 1
return res if uint & 1 == 0 else -res
def read_varint(f):
return zigzag_decode(read_varuint(f))
def unpack_osmid(num):
typ = OsmIdCode.get_type(num)
if typ is None:
return None
return typ, OsmIdCode.get_id(num)
def _read_osm2ft_v0(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
def _read_osm2ft_v1(f, ft2osm, tuples):
count = read_varuint(f)
result = {}
for i in range(count):
osmid = read_uint(f, 8)
read_uint(f, 8)
if tuples:
osmid = unpack_osmid(osmid)
fid = read_uint(f, 4)
read_uint(f, 4) # filler
if osmid is not None:
if ft2osm:
result[fid] = osmid
else:
result[osmid] = fid
return result
# TODO(zverik, mgsergio): Move this to a separate module, cause it has nothing
# to do with mwm.
def read_osm2ft(f, ft2osm=False, tuples=True):
"""Reads mwm.osm2ft file, returning a dict of feature id <-> osm id."""
header = read_uint(f, 4)
is_new_format = header == 0xFFFFFFFF
if is_new_format:
version = read_uint(f, 1)
if version == 1:
return _read_osm2ft_v1(f, ft2osm, tuples)
else:
raise Exception('Format {0} is not supported'.format(version))
else:
f.seek(0)
return _read_osm2ft_v0(f, ft2osm, tuples)

View file

@ -1,52 +1,38 @@
import argparse
import multiprocessing
import os
from .mwm import MWM
OMIM_ROOT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "..")
from mwm.find_feature import find_features
def count_feature(mwm_path, feature_name):
mwm = MWM(open(mwm_path, "rb"))
mwm.read_header()
mwm.read_types(os.path.join(OMIM_ROOT, "data", "types.txt"))
counter = 0
for feature in mwm.iter_features():
if feature_name in feature["header"]["types"]:
counter += 1
return counter
def compare_feature_num(args_tuple):
old_mwm, new_mwm, feature_name, threshold = args_tuple
old_feature_count = count_feature(old_mwm, feature_name)
new_feature_count = count_feature(new_mwm, feature_name)
delta = new_feature_count - old_feature_count
def compare_feature_num(old_mwm, new_mwm, name, threshold):
old_count = len(find_features(old_mwm, "et", name))
new_count = len(find_features(new_mwm, "et", name))
delta = new_count - old_count
if delta < 0:
p_change = float(abs(delta)) / old_feature_count * 100
p_change = float(abs(delta)) / old_count * 100
if p_change > threshold:
print("In \"{0}\" number of \"{1}\" decreased by {2:.0f}% ({3}{4})".format(
os.path.basename(new_mwm), feature_name, round(p_change), old_feature_count, new_feature_count))
print(
f'In "{os.path.basename(new_mwm)}" number of "{name}" '
f"decreased by {round(p_change)} ({old_count}{new_count})"
)
return False
return True
def compare_mwm(old_mwm_path, new_mwm_path, feature_name, threshold):
def valid_mwm(mwm_name):
return mwm_name.endswith(".mwm") and not mwm_name.startswith("World")
def compare_mwm(old_mwm_path, new_mwm_path, name, threshold):
def generate_names(path):
return {
file_name: os.path.abspath(os.path.join(path, file_name))
for file_name in os.listdir(path)
if file_name.endswith(".mwm") and not file_name.startswith("World")
}
def generate_names_dict(path):
return dict((file_name, os.path.abspath(os.path.join(path, file_name)))
for file_name in os.listdir(path) if valid_mwm(file_name))
old_mwms = generate_names(old_mwm_path)
new_mwms = generate_names(new_mwm_path)
old_mwm_list = generate_names_dict(old_mwm_path)
new_mwm_list = generate_names_dict(new_mwm_path)
same_mwm_names = set(new_mwm_list).intersection(set(old_mwm_list))
args = ((old_mwm_list[mwm], new_mwm_list[mwm], feature_name, threshold) for mwm in same_mwm_names)
same_mwms = set(new_mwms) & set(new_mwms)
args = ((old_mwms[mwm], new_mwms[mwm], name, threshold) for mwm in same_mwms)
pool = multiprocessing.Pool()
return all(pool.imap(compare_feature_num, args))

View file

@ -0,0 +1,409 @@
import enum
import os
from abc import ABC
from abc import abstractmethod
from typing import Dict
from typing import Iterable
from typing import List
from typing import Union
from mwm.types import readable_type
LANGS = (
"default",
"en",
"ja",
"fr",
"ko_rm",
"ar",
"de",
"int_name",
"ru",
"sv",
"zh",
"fi",
"be",
"ka",
"ko",
"he",
"nl",
"ga",
"ja_rm",
"el",
"it",
"es",
"zh_pinyin",
"th",
"cy",
"sr",
"uk",
"ca",
"hu",
"hsb",
"eu",
"fa",
"br",
"pl",
"hy",
"kn",
"sl",
"ro",
"sq",
"am",
"fy",
"cs",
"gd",
"sk",
"af",
"ja_kana",
"lb",
"pt",
"hr",
"fur",
"vi",
"tr",
"bg",
"eo",
"lt",
"la",
"kk",
"gsw",
"et",
"ku",
"mn",
"mk",
"lv",
"hi",
)
class MetadataField(enum.Enum):
cuisine = 1
open_hours = 2
phone_number = 3
fax_number = 4
stars = 5
operator = 6
url = 7
website = 8
internet = 9
ele = 10
turn_lanes = 11
turn_lanes_forward = 12
turn_lanes_backward = 13
email = 14
postcode = 15
wikipedia = 16
flats = 18
height = 19
min_height = 20
denomination = 21
building_levels = 22
test_id = 23
sponsored_id = 24
price_rate = 25
rating = 26
banner_url = 27
level = 28
airport_iata = 29
brand = 30
duration = 31
class RegionDataField(enum.Enum):
languages = 0
driving = 1
timezone = 2
address_format = 3
phone_format = 4
postcode_format = 5
public_holidays = 6
allow_housenames = 7
class MapType(enum.Enum):
world = 0
world_coasts = 1
country = 2
class GeomType(enum.Enum):
undefined = -1
point = 0
line = 1
area = 2
class SectionInfo:
__slots__ = "name", "offset", "size"
def __init__(self, name, offset, size):
self.name = name
self.offset = offset
self.size = size
def __repr__(self):
return (
f"SectionInfo[name: {self.name}, "
f"offset: {self.offset}, "
f"size: {self.size}]"
)
def to_json(self):
return {"name": self.name, "offset": self.offset, "size": self.size}
class MwmVersion:
__slots__ = "format", "seconds_since_epoch", "version"
def __init__(self, format, seconds_since_epoch, version):
self.format = format
self.seconds_since_epoch = seconds_since_epoch
self.version = version
def __repr__(self):
return (
f"MwmVersion[format: {self.format}, "
f"seconds since epoch: {self.seconds_since_epoch}, "
f"version: {self.version}]"
)
def to_json(self):
return {
"format": self.format,
"secondsSinceEpoch": self.seconds_since_epoch,
"version": self.version,
}
class Point:
__slots__ = "x", "y"
def __init__(self, x=0.0, y=0.0):
self.x = x
self.y = y
def __add__(self, other):
if isinstance(other, Point):
return Point(self.x + other.x, self.y + other.y)
raise NotImplementedError
def __iadd__(self, other):
if isinstance(other, Point):
self.x += other.x
self.y += other.y
raise NotImplementedError
def __repr__(self):
return f"({self.x}, {self.y})"
def to_json(self):
return {"x": self.x, "y": self.y}
class Rect:
__slots__ = "left_bottom", "right_top"
def __init__(self, left_bottom: Point, right_top: Point):
self.left_bottom = left_bottom
self.right_top = right_top
def __repr__(self):
return f"Rect[{self.left_bottom}, {self.right_top}]"
def to_json(self):
return {
"leftBottom": self.left_bottom.to_json(),
"rightTop": self.right_top.to_json(),
}
class Triangle:
__slots__ = "x", "y", "z"
def __init__(self, x: Point, y: Point, z: Point):
self.x = x
self.y = y
self.z = z
def __repr__(self):
return f"Triangle[{self.x}, {self.y}, {self.z}]"
def to_json(self):
return {"x": self.x.to_json(), "y": self.y.to_json(), "z": self.z.to_json()}
class Mwm(ABC):
def __init__(self, filename: str):
self.filename = filename
def name(self) -> str:
return os.path.basename(self.filename)
def path(self) -> str:
return self.filename
@abstractmethod
def version(self) -> MwmVersion:
pass
@abstractmethod
def type(self) -> MapType:
pass
@abstractmethod
def bounds(self) -> Rect:
pass
@abstractmethod
def sections_info(self) -> Dict[str, SectionInfo]:
pass
@abstractmethod
def __len__(self) -> int:
pass
@abstractmethod
def __iter__(self) -> Iterable:
pass
def __repr__(self):
si = "\n".join(
[
f" {s}"
for s in sorted(self.sections_info().values(), key=lambda x: x.offset)
]
)
return (
f"Mwm[\n"
f" name: {self.name()}\n"
f" type: {self.type()}\n"
f" version: {self.version()}\n"
f" number of features: {len(self)}\n"
f" bounds: {self.bounds()}\n"
f" sections info: [\n{si} \n ]\n"
f"]"
)
def to_json(self, with_features=False):
m = {
"name": self.name(),
"version": self.version().to_json(),
"type": self.type(),
"bounds": self.bounds().to_json(),
"sections_info": {k: v.to_json() for k, v in self.sections_info().items()},
"size": len(self),
}
if with_features:
m["features"] = [f.to_json() for f in self]
return m
class Feature(ABC):
@abstractmethod
def index(self) -> int:
pass
@abstractmethod
def types(self) -> List[int]:
pass
@abstractmethod
def metadata(self) -> Dict[MetadataField, str]:
pass
@abstractmethod
def names(self) -> Dict[str, str]:
pass
@abstractmethod
def readable_name(self) -> str:
pass
@abstractmethod
def rank(self) -> int:
pass
@abstractmethod
def population(self) -> int:
pass
@abstractmethod
def road_number(self) -> str:
pass
@abstractmethod
def house_number(self) -> str:
pass
@abstractmethod
def postcode(self) -> str:
pass
@abstractmethod
def layer(self) -> int:
pass
@abstractmethod
def geom_type(self) -> GeomType:
pass
@abstractmethod
def center(self) -> Point:
pass
@abstractmethod
def geometry(self) -> Union[List[Point], List[Triangle]]:
pass
@abstractmethod
def limit_rect(self) -> Rect:
pass
@abstractmethod
def parse(self):
pass
def __repr__(self):
return (
f"Feature[\n"
f" index: {self.index()}\n"
f" readable name: {self.readable_name()}\n"
f" types: {[readable_type(t) for t in self.types()]}\n"
f" names: {self.names()}\n"
f" metadata: {self.metadata()}\n"
f" geom_type: {self.geom_type()}\n"
f" center: {self.center()}\n"
f" limit_rect: {self.limit_rect()}\n"
f"]"
)
def to_json(self):
center = None
center_ = self.center()
if center_:
center = self.center().to_json()
limit_rect = None
limit_rect_ = self.limit_rect()
if limit_rect_:
limit_rect = limit_rect_.to_json()
return {
"index": self.index(),
"types": {t: readable_type(t) for t in self.types()},
"metadata": {k.name: v for k, v in self.metadata().items()},
"names": self.names(),
"readable_name": self.readable_name(),
"rank": self.rank(),
"population": self.population(),
"road_number": self.road_number(),
"house_number": self.house_number(),
"postcode": self.postcode(),
"layer": self.layer(),
"geom_type": self.geom_type(),
"center": center,
"limit_rect": limit_rect,
}

View file

@ -0,0 +1,482 @@
import logging
import mmap
import struct
from datetime import datetime
from typing import AnyStr
from typing import Dict
from typing import Iterable
from typing import List
from typing import Union
import math
from mwm import mwm_interface as mi
logger = logging.getLogger(__name__)
class MwmNative(mi.Mwm):
def __init__(self, filename: str, parse: bool = False):
super().__init__(filename)
self.f = open(filename, "r+b")
self.file = mmap.mmap(self.f.fileno(), 0)
self.tags = self._read_sections_info()
self.seek_tag("header")
coord_bits = read_varuint(self.file)
self.coord_size = (1 << coord_bits) - 1
self.base_point = mwm_bitwise_split(read_varuint(self.file))
self.bp = to_4326(self.coord_size, self.base_point)
self.bounds_ = read_bounds(self.file, self.coord_size)
self.scales = read_uint_array(self.file)
self.langs = [mi.LANGS[code] for code in read_uint_array(self.file)]
self.map_type = mi.MapType(read_varint(self.file))
self.version_ = self._read_version()
self.metadata_offsets = self._read_metadata_offsets()
def version(self) -> mi.MwmVersion:
return self.version_
def type(self) -> mi.MapType:
return self.map_type
def bounds(self) -> mi.Rect:
return self.bounds_
def sections_info(self) -> Dict[str, mi.SectionInfo]:
return self.tags
def __len__(self) -> int:
old_pos = self.file.tell()
self.seek_tag("dat")
tag_info = self.get_tag("dat")
pos = tag_info.offset
end = pos + tag_info.size
size = 0
while pos < end:
self.file.seek(pos)
feature_size = read_varuint(self.file)
pos = self.file.tell() + feature_size
size += 1
self.file.seek(old_pos)
return size
def __iter__(self) -> Iterable:
assert self.has_tag("dat")
return MwmNativeIter(self)
def get_tag(self, name: str) -> mi.SectionInfo:
return self.tags[name]
def seek_tag(self, name: str):
self.file.seek(self.tags[name].offset)
def has_tag(self, name: str) -> bool:
return name in self.tags and self.tags[name].size > 0
def _read_sections_info(self) -> Dict[str, mi.SectionInfo]:
self.file.seek(0)
self.file.seek(read_uint(self.file, 8))
tags = {}
for _ in range(read_varuint(self.file)):
name = read_string(self.file, plain=True)
offset = read_varuint(self.file)
length = read_varuint(self.file)
tags[name] = mi.SectionInfo(name=name, offset=offset, size=length)
return tags
def _read_metadata_offsets(self) -> Dict[int, int]:
self.seek_tag("metaidx")
tag_info = self.get_tag("metaidx")
current = 0
metadata_offsets = {}
while current < tag_info.size:
id = read_uint(self.file, 4)
offs = read_uint(self.file, 4)
metadata_offsets[id] = offs
current += 8
return metadata_offsets
def _read_version(self) -> mi.MwmVersion:
self.seek_tag("version")
# Skip prolog.
self.file.read(4)
fmt = read_varuint(self.file) + 1
seconds_since_epoch = read_varuint(self.file)
vdate = datetime.fromtimestamp(seconds_since_epoch)
version = int(vdate.strftime("%y%m%d"))
return mi.MwmVersion(
format=fmt, seconds_since_epoch=seconds_since_epoch, version=version
)
class MwmNativeIter:
def __init__(self, mwm: MwmNative):
self.mwm = mwm
self.index = 0
tag_info = self.mwm.get_tag("dat")
self.pos = tag_info.offset
self.end = self.pos + tag_info.size
def __iter__(self) -> "MwmNativeIter":
return self
def __next__(self) -> "FeatureNative":
if self.end < self.pos:
raise StopIteration
self.mwm.file.seek(self.pos)
feature_size = read_varuint(self.mwm.file)
self.pos = self.mwm.file.tell() + feature_size
feature = FeatureNative(self.mwm, self.index)
self.index += 1
return feature
class GeomType:
POINT = 0
LINE = 1 << 5
AREA = 1 << 6
POINT_EX = 3 << 5
class FeatureNative(mi.Feature):
def __init__(self, mwm: MwmNative, index: int):
self.mwm = mwm
self._index = index
header_bits = read_uint(self.mwm.file, 1)
types_count = (header_bits & 0x07) + 1
has_name = header_bits & 0x08 > 0
has_layer = header_bits & 0x10 > 0
has_addinfo = header_bits & 0x80 > 0
geom_type = header_bits & 0x60
self._types = [read_varuint(self.mwm.file) for _ in range(types_count)]
self._names = read_multilang(self.mwm.file) if has_name else {}
self._layer = read_uint(self.mwm.file, 1) if has_layer else 0
self._rank = 0
self._road_number = ""
self._house_number = ""
if has_addinfo:
if geom_type == GeomType.POINT:
self._rank = read_uint(self.mwm.file, 1)
elif geom_type == GeomType.LINE:
self._road_number = read_string(self.mwm.file)
elif geom_type == GeomType.AREA or geom_type == GeomType.POINT_EX:
self._house_number = read_numeric_string(self.mwm.file)
self._geom_type, self._geometry = self._init_geom(geom_type)
def readable_name(self) -> str:
if "default" in self._names:
return self._names["default"]
elif "en" in self._names:
return self._names["en"]
elif self._names:
k = next(iter(self._names))
return self._names[k]
return ""
def population(self) -> int:
logger.warn("Method population() does not have an implementation.")
def center(self) -> mi.Point:
logger.warn("Method center() does not have an implementation.")
def limit_rect(self) -> mi.Rect:
logger.warn("Method limit_rect() does not have an implementation.")
def postcode(self) -> str:
logger.warn("Method postcode() does not have an implementation.")
def index(self) -> int:
return self._index
def types(self) -> List[int]:
return self._types
def metadata(self) -> Dict[mi.MetadataField, str]:
mwm = self.mwm
if mwm.metadata_offsets is None or self._index not in mwm.metadata_offsets:
return {}
old_pos = mwm.file.tell()
new_pos = mwm.get_tag("meta").offset + mwm.metadata_offsets[self._index]
mwm.file.seek(new_pos)
metadata = {}
if mwm.version().format >= 8:
sz = read_varuint(mwm.file)
for _ in range(sz):
t = read_varuint(mwm.file)
field = mi.MetadataField(t)
metadata[field] = read_string(mwm.file)
else:
while True:
t = read_uint(mwm.file, 1)
is_last = t & 0x80 > 0
t = t & 0x7F
l = read_uint(mwm.file, 1)
field = mi.MetadataField(t)
metadata[field] = mwm.file.read(l).decode("utf-8")
if is_last:
break
mwm.file.seek(old_pos)
return metadata
def names(self) -> Dict[str, str]:
return self._names
def rank(self) -> int:
return self._rank
def road_number(self) -> str:
return self._road_number
def house_number(self) -> str:
return self._house_number
def layer(self) -> int:
return self._layer
def geom_type(self) -> mi.GeomType:
return self._geom_type
def geometry(self) -> Union[List[mi.Point], List[mi.Triangle]]:
return self._geometry
def parse(self):
pass
def _init_geom(self, t):
geom_type = None
geometry = []
if t == GeomType.POINT or t == GeomType.POINT_EX:
geom_type = mi.GeomType.point
geometry = [
read_coord(self.mwm.file, self.mwm.base_point, self.mwm.coord_size)
]
elif t == GeomType.LINE:
geom_type = mi.GeomType.line
logger.warn("Method geometry() does not have an implementation for line.")
elif t == GeomType.AREA:
geom_type = mi.GeomType.area
logger.warn("Method geometry() does not have an implementation for area.")
else:
geom_type = mi.GeomType.undefined
return geom_type, geometry
def get_region_info(path):
m = MwmNative(path)
if not m.has_tag("rgninfo"):
return {}
region_info = {}
m.seek_tag("rgninfo")
sz = read_varuint(m.file)
for _ in range(sz):
t = read_varuint(m.file)
filed = mi.RegionDataField(t)
region_info[filed] = read_string(m.file)
if t == mi.RegionDataField.languages:
region_info[filed] = [mi.LANGS[ord(x)] for x in region_info[filed]]
return region_info
def get_crossmwm(path):
m = MwmNative(path)
if not m.has_tag("chrysler"):
return {}
m.seek_tag("chrysler")
# Ingoing nodes: array of (nodeId, coord) tuples
incomingCount = read_uint(m.file, 4)
incoming = []
for _ in range(incomingCount):
nodeId = read_uint(m.file, 4)
point = read_coord(m.file, m.base_point, m.coord_size, False)
incoming.append((nodeId, point))
# Outgoing nodes: array of (nodeId, coord, outIndex) tuples
# outIndex is an index in neighbours array
outgoingCount = read_uint(m.file, 4)
outgoing = []
for _ in range(outgoingCount):
nodeId = read_uint(m.file, 4)
point = read_coord(m.file, m.base_point, m.coord_size, False)
outIndex = read_uint(m.file, 1)
outgoing.append((nodeId, point, outIndex))
# Adjacency matrix: costs of routes for each (incoming, outgoing) tuple
matrix = []
for _ in range(incomingCount):
matrix.append([read_uint(m.file, 4) for _ in range(outgoingCount)])
# List of mwms to which leads each outgoing node
neighboursCount = read_uint(m.file, 4)
neighbours = []
for _ in range(neighboursCount):
size = read_uint(m.file, 4)
neighbours.append(m.file.read(size).decode("utf-8"))
return {"in": incoming, "out": outgoing, "matrix": matrix, "neighbours": neighbours}
def read_point(f, base_point: mi.Point, packed: bool = True) -> mi.Point:
"""Reads an unsigned point, returns (x, y)."""
u = read_varuint(f) if packed else read_uint(f, 8)
return mwm_decode_delta(u, base_point)
def to_4326(coord_size: int, point: mi.Point) -> mi.Point:
"""Convert a point in maps.me-mercator CS to WGS-84 (EPSG:4326)."""
merc_bounds = (-180.0, -180.0, 180.0, 180.0) # Xmin, Ymin, Xmax, Ymax
x = point.x * (merc_bounds[2] - merc_bounds[0]) / coord_size + merc_bounds[0]
y = point.y * (merc_bounds[3] - merc_bounds[1]) / coord_size + merc_bounds[1]
y = 360.0 * math.atan(math.tanh(y * math.pi / 360.0)) / math.pi
return mi.Point(x, y)
def read_coord(
f, base_point: mi.Point, coord_size: int, packed: bool = True
) -> mi.Point:
"""Reads a pair of coords in degrees mercator, returns (lon, lat)."""
point = read_point(f, base_point, packed)
return to_4326(coord_size, point)
def read_bounds(f, coord_size) -> mi.Rect:
"""Reads mercator bounds, returns (min_lon, min_lat, max_lon, max_lat)."""
rmin = mwm_bitwise_split(read_varint(f))
rmax = mwm_bitwise_split(read_varint(f))
pmin = to_4326(coord_size, rmin)
pmax = to_4326(coord_size, rmax)
return mi.Rect(left_bottom=pmin, right_top=pmax)
def read_string(f, plain: bool = False, decode: bool = True) -> AnyStr:
length = read_varuint(f) + (0 if plain else 1)
s = f.read(length)
return s.decode("utf-8") if decode else s
def read_uint_array(f) -> List[int]:
length = read_varuint(f)
return [read_varuint(f) for _ in range(length)]
def read_numeric_string(f) -> str:
sz = read_varuint(f)
if sz & 1 != 0:
return str(sz >> 1)
sz = (sz >> 1) + 1
return f.read(sz).decode("utf-8")
def read_multilang(f) -> Dict[str, str]:
def find_multilang_next(s, i):
i += 1
while i < len(s):
try:
c = ord(s[i])
except:
c = s[i]
if c & 0xC0 == 0x80:
break
if c & 0x80 == 0:
pass
elif c & 0xFE == 0xFE:
i += 6
elif c & 0xFC == 0xFC:
i += 5
elif c & 0xF8 == 0xF8:
i += 4
elif c & 0xF0 == 0xF0:
i += 3
elif c & 0xE0 == 0xE0:
i += 2
elif c & 0xC0 == 0xC0:
i += 1
i += 1
return i
s = read_string(f, decode=False)
langs = {}
i = 0
while i < len(s):
n = find_multilang_next(s, i)
try:
lng = ord(s[i]) & 0x3F
except TypeError:
lng = s[i] & 0x3F
if lng < len(mi.LANGS):
langs[mi.LANGS[lng]] = s[i + 1 : n].decode("utf-8")
i = n
return langs
def mwm_unshuffle(x: int) -> int:
x = ((x & 0x22222222) << 1) | ((x >> 1) & 0x22222222) | (x & 0x99999999)
x = ((x & 0x0C0C0C0C) << 2) | ((x >> 2) & 0x0C0C0C0C) | (x & 0xC3C3C3C3)
x = ((x & 0x00F000F0) << 4) | ((x >> 4) & 0x00F000F0) | (x & 0xF00FF00F)
x = ((x & 0x0000FF00) << 8) | ((x >> 8) & 0x0000FF00) | (x & 0xFF0000FF)
return x
def mwm_bitwise_split(v) -> mi.Point:
hi = mwm_unshuffle(v >> 32)
lo = mwm_unshuffle(v & 0xFFFFFFFF)
x = ((hi & 0xFFFF) << 16) | (lo & 0xFFFF)
y = (hi & 0xFFFF0000) | (lo >> 16)
return mi.Point(x, y)
def mwm_decode_delta(v, base_point: mi.Point) -> mi.Point:
p = mwm_bitwise_split(v)
return p + base_point
def read_uint(f, bytelen: int = 1) -> int:
if bytelen == 1:
fmt = "B"
elif bytelen == 2:
fmt = "H"
elif bytelen == 4:
fmt = "I"
elif bytelen == 8:
fmt = "Q"
else:
raise Exception("Bytelen {0} is not supported".format(bytelen))
res = struct.unpack(fmt, f.read(bytelen))
return res[0]
def read_varuint(f) -> int:
res = 0
shift = 0
more = True
while more:
b = f.read(1)
if not b:
return res
try:
bc = ord(b)
except TypeError:
bc = b
res |= (bc & 0x7F) << shift
shift += 7
more = bc >= 0x80
return res
def zigzag_decode(uint: int) -> int:
res = uint >> 1
return res if uint & 1 == 0 else -res
def read_varint(f) -> int:
return zigzag_decode(read_varuint(f))

View file

@ -0,0 +1,131 @@
from typing import Dict
from typing import Iterable
from typing import List
from typing import Union
from pygen import geometry
from pygen import mwm
from mwm import mwm_interface as mi
class MwmPygen(mi.Mwm):
def __init__(self, filename: str, parse: bool = True):
super().__init__(filename)
self.mwm = mwm.Mwm(filename, parse)
def version(self) -> mi.MwmVersion:
v = self.mwm.version()
return mi.MwmVersion(
format=int(v.format()) + 1,
seconds_since_epoch=v.seconds_since_epoch(),
version=v.version(),
)
def type(self) -> mi.MapType:
t = self.mwm.type()
return mi.MapType(int(t))
def bounds(self) -> mi.Rect:
b = self.mwm.bounds()
return from_pygen_rect(b)
def sections_info(self) -> Dict[str, mi.SectionInfo]:
si = self.mwm.sections_info()
return {
k: mi.SectionInfo(name=v.tag, offset=v.offset, size=v.offset)
for k, v in si.items()
}
def __len__(self) -> int:
return self.mwm.__len__()
def __iter__(self) -> Iterable:
return FeaturePygenIter(self.mwm.__iter__())
class FeaturePygenIter:
def __init__(self, iter: mwm.MwmIter):
self.iter = iter
def __iter__(self) -> "FeaturePygenIter":
return self
def __next__(self) -> "FeaturePygen":
ft = self.iter.__next__()
return FeaturePygen(ft)
class FeaturePygen(mi.Feature):
def __init__(self, ft: mwm.FeatureType):
self.ft = ft
def index(self) -> int:
return self.ft.index()
def types(self) -> List[int]:
return self.ft.types()
def metadata(self) -> Dict[mi.MetadataField, str]:
m = self.ft.metadata()
return {mi.MetadataField(int(k)): v for k, v in m.items()}
def names(self) -> Dict[str, str]:
return self.ft.names()
def readable_name(self) -> str:
return self.ft.readable_name()
def rank(self) -> int:
return self.ft.rank()
def population(self) -> int:
return self.ft.population()
def road_number(self) -> str:
return self.ft.road_number()
def house_number(self) -> str:
return self.ft.house_number()
def postcode(self) -> str:
return self.ft.postcode()
def layer(self) -> int:
return self.ft.layer()
def geom_type(self) -> mi.GeomType:
g = self.ft.geom_type()
return mi.GeomType(int(g))
def center(self) -> mi.Point:
c = self.ft.center()
return from_pygen_point(c)
def geometry(self) -> Union[List[mi.Point], List[mi.Triangle]]:
if self.geom_type() == mi.GeomType.area:
return [from_pygen_triangle(t) for t in self.ft.geometry()]
return [from_pygen_point(t) for t in self.ft.geometry()]
def limit_rect(self) -> mi.Rect:
r = self.ft.limit_rect()
return from_pygen_rect(r)
def parse(self):
self.ft.parse()
def from_pygen_point(p: geometry.PointD) -> mi.Point:
return mi.Point(p.x, p.y)
def from_pygen_rect(r: geometry.RectD) -> mi.Rect:
return mi.Rect(from_pygen_point(r.left_bottom), from_pygen_point(r.right_top))
def from_pygen_triangle(t: geometry.TriangleD) -> mi.Triangle:
return mi.Triangle(
from_pygen_point(t.x()), from_pygen_point(t.y()), from_pygen_point(t.z())
)

View file

@ -0,0 +1,137 @@
import logging
import os
import timeit
import mwm
logger = logging.getLogger("mwm")
logger.setLevel(logging.ERROR)
def example__storing_features_in_a_collection(path):
ft_list = [ft for ft in mwm.Mwm(path)]
print(f"List size: {len(ft_list)}")
ft_tuple = tuple(ft for ft in mwm.Mwm(path))
print(f"Tuple size: {len(ft_tuple)}")
def slow():
ft_with_metadata_list = []
for ft in mwm.Mwm(path):
if ft.metadata():
ft_with_metadata_list.append(ft)
return ft_with_metadata_list
ft_with_metadata_list = slow()
print("Features with metadata:", len(ft_with_metadata_list))
print("First three are:", ft_with_metadata_list[:3])
def fast():
ft_with_metadata_list = []
for ft in mwm.Mwm(path, False):
if ft.metadata():
ft_with_metadata_list.append(ft.parse())
return ft_with_metadata_list
tslow = timeit.timeit(slow, number=10)
tfast = timeit.timeit(fast, number=10)
print(f"Slow took {tslow}, fast took {tfast}.")
def example__features_generator(path):
def make_gen(path):
return (ft for ft in mwm.Mwm(path))
cnt = 0
print("Names of several first features:")
for ft in make_gen(path):
print(ft.names())
if cnt == 5:
break
cnt += 1
def return_ft(num):
cnt = 0
for ft in mwm.Mwm(path):
if cnt == num:
return ft
cnt += 1
print(return_ft(10))
def example__sequential_processing(path):
long_names = []
for ft in mwm.Mwm(path):
if len(ft.readable_name()) > 100:
long_names.append(ft.readable_name())
print("Long names:", long_names)
def example__working_with_features(path):
it = iter(mwm.Mwm(path))
ft = next(it)
print("Feature members are:", dir(ft))
print("index:", ft.index())
print(
"types:",
ft.types(),
"redable types:",
[mwm.readable_type(t) for t in ft.types()],
)
print("metadata:", ft.metadata())
print("names:", ft.names())
print("readable_name:", ft.readable_name())
print("rank:", ft.rank())
print("population:", ft.population())
print("road_number:", ft.road_number())
print("house_number:", ft.house_number())
print("postcode:", ft.postcode())
print("layer:", ft.layer())
print("geom_type:", ft.geom_type())
print("center:", ft.center())
print("geometry:", ft.geometry())
print("limit_rect:", ft.limit_rect())
print("__repr__:", ft)
for ft in it:
geometry = ft.geometry()
if ft.geom_type() == mwm.GeomType.area and len(geometry) < 10:
print("area geometry", geometry)
break
def example__working_with_mwm(path):
map = mwm.Mwm(path)
print("Mwm members are:", dir(map))
print(map)
print("version:", map.version())
print("type:", map.type())
print("bounds:", map.bounds())
print("sections_info:", map.sections_info())
def main(path):
example__storing_features_in_a_collection(path)
example__features_generator(path)
example__sequential_processing(path)
example__working_with_features(path)
example__working_with_mwm(path)
if __name__ == "__main__":
main(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..",
"..",
"..",
"data",
"minsk-pass.mwm",
)
)

23
tools/python/mwm/types.py Normal file
View file

@ -0,0 +1,23 @@
import os
from typing import Dict
def read_types_mappings() -> Dict[int, str]:
resources_path = os.environ.get("MWM_RESOURCES_DIR")
types = {}
with open(os.path.join(resources_path, "types.txt")) as f:
for i, line in enumerate(f):
if line.startswith("*"):
types[i] = line[1:].strip().replace("|", "-")
return types
TYPES_MAPPING = read_types_mappings()
def readable_type(type: int) -> str:
try:
return TYPES_MAPPING[type]
except KeyError:
return "unknown"

View file

@ -0,0 +1,9 @@
import enum
import json
class EnumAsStrEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, enum.Enum):
return obj.name
return json.JSONEncoder.default(self, obj)