Create universal serializable transit data format, use it in GTFS processor

This commit is contained in:
Alexey Zakharenkov 2022-12-15 15:17:15 +03:00 committed by Alexey Zakharenkov
parent 9271a0f508
commit 86e65d2115
3 changed files with 262 additions and 137 deletions

View file

@ -1,3 +1,7 @@
from typing import List, Set
from subway_structure import City, el_center, StopArea
DEFAULT_INTERVAL = 2.5 * 60 # seconds
KMPH_TO_MPS = 1 / 3.6 # km/h to m/s conversion multiplier
SPEED_ON_TRANSFER = 3.5 * KMPH_TO_MPS # m/s
@ -7,3 +11,98 @@ TRANSFER_PENALTY = 30 # seconds
def format_colour(colour):
"""Truncate leading # sign."""
return colour[1:] if colour else None
def transit_to_dict(
cities: List[City], transfers: List[Set[StopArea]]
) -> dict:
"""Get data for good cities as a dictionary."""
data = {
"stopareas": {}, # stoparea id => stoparea data
"networks": {}, # city name => city data
"transfers": {}, # set(tuple(stoparea_id1, stoparea_id2)), id1<id2
}
for city in (c for c in cities if c.is_good):
network = {
"id": city.id,
"name": city.name,
"routes": [],
}
for route_master in city:
route_data = {
"id": route_master.id,
"mode": route_master.mode,
"ref": route_master.ref,
"name": route_master.name,
"colour": route_master.colour,
"infill": route_master.infill,
"itineraries": [],
}
for route in route_master:
variant_data = {
"id": route.id,
"tracks": route.get_tracks_geometry(),
"start_time": route.start_time,
"end_time": route.end_time,
"interval": route.interval,
"stops": [
{
"stoparea_id": route_stop.stoparea.id,
"distance": route_stop.distance,
}
for route_stop in route.stops
],
}
# Store stopareas participating in the route
# and that have not been stored yet
for route_stop in route.stops:
stoparea = route_stop.stoparea
if stoparea.id in data["stopareas"]:
continue
stoparea_data = {
"id": stoparea.id,
"center": stoparea.center,
"name": stoparea.station.name,
"entrances": [
{
"id": egress_id,
"name": egress["tags"].get("name"),
"ref": egress["tags"].get("ref"),
"center": el_center(egress),
}
for (egress_id, egress) in (
(egress_id, city.elements[egress_id])
for egress_id in stoparea.entrances
| stoparea.exits
)
],
}
data["stopareas"][stoparea.id] = stoparea_data
route_data["itineraries"].append(variant_data)
network["routes"].append(route_data)
data["networks"][city.name] = network
# transfers
pairwise_transfers = set()
for stoparea_set in transfers:
stoparea_list = list(stoparea_set)
for first_i in range(len(stoparea_list) - 1):
for second_i in range(first_i + 1, len(stoparea_list)):
stoparea1_id = stoparea_list[first_i].id
stoparea2_id = stoparea_list[second_i].id
if all(
st_id in data["stopareas"]
for st_id in (stoparea1_id, stoparea2_id)
):
id1, id2 = sorted([stoparea1_id, stoparea2_id])
pairwise_transfers.add((id1, id2))
data["transfers"] = pairwise_transfers
return data

View file

@ -1,19 +1,22 @@
import csv
import io
import zipfile
from functools import partial
from io import BytesIO, StringIO
from itertools import permutations
from tarfile import TarFile, TarInfo
from typing import List, Set
from zipfile import ZipFile
from ._common import (
DEFAULT_INTERVAL,
format_colour,
SPEED_ON_TRANSFER,
TRANSFER_PENALTY,
transit_to_dict,
)
from subway_structure import (
City,
distance,
el_center,
StopArea,
)
@ -136,19 +139,11 @@ def round_coords(coords_tuple):
)
def process(cities, transfers, filename, cache_path):
"""Generate all output and save to file.
:param cities: List of City instances
:param transfers: List of sets of StopArea.id
:param filename: Path to file to save the result
:param cache_path: Path to json-file with good cities cache or None.
"""
# TODO: make universal cache for all processors, and apply the cache to GTFS
def transit_data_to_gtfs(data):
# Keys correspond GTFS file names
gtfs_data = {key: [] for key in GTFS_COLUMNS.keys()}
# GTFS calendar
gtfs_data["calendar"].append(
{
"service_id": "always",
@ -164,129 +159,109 @@ def process(cities, transfers, filename, cache_path):
}
)
all_stops = {} # stop (stop area center or station) el_id -> stop data
good_cities = [c for c in cities if c.is_good]
# GTFS stops
for stoparea_id, stoparea_data in data["stopareas"].items():
station_id = f"{stoparea_id}_st"
station_name = stoparea_data["name"]
station_center = round_coords(stoparea_data["center"])
station_gtfs = {
"stop_id": station_id,
"stop_code": station_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 1, # station in GTFS terms
}
gtfs_data["stops"].append(station_gtfs)
def add_stop_gtfs(route_stop, city):
"""Add stop to all_stops.
If it's not a station, also add parent station
if it has not been added yet. Return gtfs stop_id.
"""
platform_id = f"{stoparea_id}_plt"
platform_gtfs = {
"stop_id": platform_id,
"stop_code": platform_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 0, # stop/platform in GTFS terms
"parent_station": station_id,
}
gtfs_data["stops"].append(platform_gtfs)
# For the case a StopArea is derived solely from railway=station
# object, we generate GTFS platform (stop), station and sometimes
# an entrance from the same object, so use suffixes
station_id = f"{route_stop.stoparea.id}_st"
platform_id = f"{route_stop.stoparea.id}_plt"
if station_id not in all_stops:
station_name = route_stop.stoparea.station.name
station_center = round_coords(route_stop.stoparea.center)
station_gtfs = {
"stop_id": station_id,
"stop_code": station_id,
if not stoparea_data["entrances"]:
entrance_id = f"{stoparea_id}_egress"
entrance_gtfs = {
"stop_id": entrance_id,
"stop_code": entrance_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 1, # station in GTFS terms
}
all_stops[station_id] = station_gtfs
platform_gtfs = {
"stop_id": platform_id,
"stop_code": platform_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 0, # stop/platform in GTFS terms
"location_type": 2,
"parent_station": station_id,
}
all_stops[platform_id] = platform_gtfs
osm_entrance_ids = (
route_stop.stoparea.entrances | route_stop.stoparea.exits
)
if not osm_entrance_ids:
entrance_id = f"{route_stop.stoparea.id}_egress"
gtfs_data["stops"].append(entrance_gtfs)
else:
for entrance in stoparea_data["entrances"]:
entrance_id = f"{entrance['id']}_{stoparea_id}"
entrance_name = entrance["name"]
if not entrance["name"]:
entrance_name = station_name
ref = entrance["ref"]
if ref:
entrance_name += f" {ref}"
center = round_coords(entrance["center"])
entrance_gtfs = {
"stop_id": entrance_id,
"stop_code": entrance_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"stop_name": entrance_name,
"stop_lat": center[1],
"stop_lon": center[0],
"location_type": 2,
"parent_station": station_id,
}
all_stops[entrance_id] = entrance_gtfs
else:
for osm_entrance_id in osm_entrance_ids:
entrance = city.elements[osm_entrance_id]
entrance_id = f"{osm_entrance_id}_{route_stop.stoparea.id}"
entrance_name = entrance["tags"].get("name")
if not entrance_name:
entrance_name = station_name
ref = entrance["tags"].get("ref")
if ref:
entrance_name += f" {ref}"
center = el_center(entrance)
center = round_coords(center)
entrance_gtfs = {
"stop_id": entrance_id,
"stop_code": entrance_id,
"stop_name": entrance_name,
"stop_lat": center[1],
"stop_lon": center[0],
"location_type": 2,
"parent_station": station_id,
}
all_stops[entrance_id] = entrance_gtfs
return platform_id
gtfs_data["stops"].append(entrance_gtfs)
# agency, routes, trips, stop_times, frequencies, shapes
for city in good_cities:
agency = {"agency_id": city.id, "agency_name": city.name}
for network in data["networks"].values():
agency = {
"agency_id": network["id"],
"agency_name": network["name"],
}
gtfs_data["agency"].append(agency)
for city_route in city:
for route_master in network["routes"]:
route = {
"route_id": city_route.id,
"agency_id": agency["agency_id"],
"route_type": 12 if city_route.mode == "monorail" else 1,
"route_short_name": city_route.ref,
"route_long_name": city_route.name,
"route_color": format_colour(city_route.colour),
"route_id": route_master["id"],
"agency_id": network["id"],
"route_type": 12 if route_master["mode"] == "monorail" else 1,
"route_short_name": route_master["ref"],
"route_long_name": route_master["name"],
"route_color": format_colour(route_master["colour"]),
}
gtfs_data["routes"].append(route)
for variant in city_route:
shape_id = variant.id[1:] # truncate leading 'r'
for itinerary in route_master["itineraries"]:
shape_id = itinerary["id"][1:] # truncate leading 'r'
trip = {
"trip_id": variant.id,
"route_id": route["route_id"],
"trip_id": itinerary["id"],
"route_id": route_master["id"],
"service_id": "always",
"shape_id": shape_id,
}
gtfs_data["trips"].append(trip)
tracks = variant.get_extended_tracks()
tracks = variant.get_truncated_tracks(tracks)
for i, (lon, lat) in enumerate(tracks):
for i, (lon, lat) in enumerate(itinerary["tracks"]):
lon, lat = round_coords((lon, lat))
gtfs_data["shapes"].append(
{
"shape_id": shape_id,
"trip_id": variant.id,
"trip_id": itinerary["id"],
"shape_pt_lat": lat,
"shape_pt_lon": lon,
"shape_pt_sequence": i,
}
)
start_time = variant.start_time or DEFAULT_TRIP_START_TIME
end_time = variant.end_time or DEFAULT_TRIP_END_TIME
start_time = itinerary["start_time"] or DEFAULT_TRIP_START_TIME
end_time = itinerary["end_time"] or DEFAULT_TRIP_END_TIME
if end_time <= start_time:
end_time = (end_time[0] + 24, end_time[1])
start_time = f"{start_time[0]:02d}:{start_time[1]:02d}:00"
@ -294,51 +269,66 @@ def process(cities, transfers, filename, cache_path):
gtfs_data["frequencies"].append(
{
"trip_id": variant.id,
"trip_id": itinerary["id"],
"start_time": start_time,
"end_time": end_time,
"headway_secs": variant.interval
"headway_secs": itinerary["interval"]
or DEFAULT_INTERVAL,
}
)
for stop_sequence, route_stop in enumerate(variant):
gtfs_platform_id = add_stop_gtfs(route_stop, city)
for i, route_stop in enumerate(itinerary["stops"]):
platform_id = f"{route_stop['stoparea_id']}_plt"
gtfs_data["stop_times"].append(
{
"trip_id": variant.id,
"stop_sequence": stop_sequence,
"shape_dist_traveled": route_stop.distance,
"stop_id": gtfs_platform_id,
"trip_id": itinerary["id"],
"stop_sequence": i,
"shape_dist_traveled": route_stop["distance"],
"stop_id": platform_id,
}
)
# stops
gtfs_data["stops"].extend(all_stops.values())
# transfers
for stoparea_set in transfers:
for stoparea1 in stoparea_set:
for stoparea2 in stoparea_set:
if stoparea1.id < stoparea2.id:
stop1_id = f"{stoparea1.id}_st"
stop2_id = f"{stoparea2.id}_st"
if not {stop1_id, stop2_id}.issubset(all_stops):
continue
transfer_time = TRANSFER_PENALTY + round(
distance(stoparea1.center, stoparea2.center)
/ SPEED_ON_TRANSFER
)
for id1, id2 in permutations((stop1_id, stop2_id)):
gtfs_data["transfers"].append(
{
"from_stop_id": id1,
"to_stop_id": id2,
"transfer_type": 0,
"min_transfer_time": transfer_time,
}
)
for stoparea1_id, stoparea2_id in data["transfers"]:
stoparea1 = data["stopareas"][stoparea1_id]
stoparea2 = data["stopareas"][stoparea2_id]
transfer_time = TRANSFER_PENALTY + round(
distance(stoparea1["center"], stoparea2["center"])
/ SPEED_ON_TRANSFER
)
gtfs_sa_id1 = f"{stoparea1['id']}_st"
gtfs_sa_id2 = f"{stoparea2['id']}_st"
for id1, id2 in permutations((gtfs_sa_id1, gtfs_sa_id2)):
gtfs_data["transfers"].append(
{
"from_stop_id": id1,
"to_stop_id": id2,
"transfer_type": 0,
"min_transfer_time": transfer_time,
}
)
return gtfs_data
def process(
cities: List[City],
transfers: List[Set[StopArea]],
filename: str,
cache_path: str,
):
"""Generate all output and save to file.
:param cities: List of City instances
:param transfers: List of sets of StopArea.id
:param filename: Path to file to save the result
:param cache_path: Path to json-file with good cities cache or None.
"""
transit_data = transit_to_dict(cities, transfers)
gtfs_data = transit_data_to_gtfs(transit_data)
# TODO: make universal cache for all processors, and apply the cache to GTFS
make_gtfs(filename, gtfs_data)
@ -353,19 +343,50 @@ def dict_to_row(dict_data: dict, record_type: str) -> list:
]
def make_gtfs(filename, gtfs_data):
if not filename.lower().endswith("zip"):
def make_gtfs(filename: str, gtfs_data: dict, fmt: str = None) -> None:
if not fmt:
fmt = "tar" if filename.endswith(".tar") else "zip"
if fmt == "zip":
make_gtfs_zip(filename, gtfs_data)
else:
make_gtfs_tar(filename, gtfs_data)
def make_gtfs_zip(filename: str, gtfs_data: dict) -> None:
if not filename.lower().endswith(".zip"):
filename = f"{filename}.zip"
with zipfile.ZipFile(filename, "w") as zf:
with ZipFile(filename, "w") as zf:
for gtfs_feature, columns in GTFS_COLUMNS.items():
with io.StringIO(newline="") as string_io:
with StringIO(newline="") as string_io:
writer = csv.writer(string_io, delimiter=",")
writer.writerow(columns)
writer.writerows(
map(
partial(dict_to_row, record_type=gtfs_feature),
gtfs_data[gtfs_feature]
gtfs_data[gtfs_feature],
)
)
zf.writestr(f"{gtfs_feature}.txt", string_io.getvalue())
def make_gtfs_tar(filename: str, gtfs_data: dict) -> None:
if not filename.lower().endswith(".tar"):
filename = f"{filename}.tar"
with TarFile(filename, "w") as tf:
for gtfs_feature, columns in GTFS_COLUMNS.items():
with StringIO(newline="") as string_io:
writer = csv.writer(string_io, delimiter=",")
writer.writerow(columns)
writer.writerows(
map(
partial(dict_to_row, record_type=gtfs_feature),
gtfs_data[gtfs_feature],
)
)
tarinfo = TarInfo(f"{gtfs_feature}.txt")
data = string_io.getvalue().encode()
tarinfo.size = len(data)
tf.addfile(tarinfo, BytesIO(data))

View file

@ -1157,6 +1157,11 @@ class Route:
return tracks
def get_tracks_geometry(self):
tracks = self.get_extended_tracks()
tracks = self.get_truncated_tracks(tracks)
return tracks
def check_stops_order_by_angle(self):
disorder_warnings = []
disorder_errors = []