| | |
| |
|
| | import json |
| | import re |
| | from dataclasses import dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional |
| |
|
| | from lxml import etree |
| | import numpy as np |
| |
|
| | from utils.geo import BoundaryBox, Projection |
| |
|
| | METERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*m$") |
| | KILOMETERS_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*km$") |
| | MILES_PATTERN: re.Pattern = re.compile("^(?P<value>\\d*\\.?\\d*)\\s*mi$") |
| |
|
| |
|
| | def parse_float(string: str) -> Optional[float]: |
| | """Parse string representation of a float or integer value.""" |
| | try: |
| | return float(string) |
| | except (TypeError, ValueError): |
| | return None |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class OSMElement: |
| | """ |
| | Something with tags (string to string mapping). |
| | """ |
| |
|
| | id_: int |
| | tags: Dict[str, str] |
| |
|
| | def get_float(self, key: str) -> Optional[float]: |
| | """Parse float from tag value.""" |
| | if key in self.tags: |
| | return parse_float(self.tags[key]) |
| | return None |
| |
|
| | def get_length(self, key: str) -> Optional[float]: |
| | """Get length in meters.""" |
| | if key not in self.tags: |
| | return None |
| |
|
| | value: str = self.tags[key] |
| |
|
| | float_value: float = parse_float(value) |
| | if float_value is not None: |
| | return float_value |
| |
|
| | for pattern, ratio in [ |
| | (METERS_PATTERN, 1.0), |
| | (KILOMETERS_PATTERN, 1000.0), |
| | (MILES_PATTERN, 1609.344), |
| | ]: |
| | matcher: re.Match = pattern.match(value) |
| | if matcher: |
| | float_value: float = parse_float(matcher.group("value")) |
| | if float_value is not None: |
| | return float_value * ratio |
| |
|
| | return None |
| |
|
| | def __hash__(self) -> int: |
| | return self.id_ |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class OSMNode(OSMElement): |
| | """ |
| | OpenStreetMap node. |
| | |
| | See https://wiki.openstreetmap.org/wiki/Node |
| | """ |
| |
|
| | geo: np.ndarray |
| | visible: Optional[str] = None |
| | xy: Optional[np.ndarray] = None |
| |
|
| | @classmethod |
| | def from_dict(cls, structure: Dict[str, Any]) -> "OSMNode": |
| | """ |
| | Parse node from Overpass-like structure. |
| | |
| | :param structure: input structure |
| | """ |
| | return cls( |
| | structure["id"], |
| | structure.get("tags", {}), |
| | geo=np.array((structure["lat"], structure["lon"])), |
| | visible=structure.get("visible"), |
| | ) |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class OSMWay(OSMElement): |
| | """ |
| | OpenStreetMap way. |
| | |
| | See https://wiki.openstreetmap.org/wiki/Way |
| | """ |
| |
|
| | nodes: Optional[List[OSMNode]] = field(default_factory=list) |
| | visible: Optional[str] = None |
| |
|
| | @classmethod |
| | def from_dict( |
| | cls, structure: Dict[str, Any], nodes: Dict[int, OSMNode] |
| | ) -> "OSMWay": |
| | """ |
| | Parse way from Overpass-like structure. |
| | |
| | :param structure: input structure |
| | :param nodes: node structure |
| | """ |
| | return cls( |
| | structure["id"], |
| | structure.get("tags", {}), |
| | [nodes[x] for x in structure["nodes"]], |
| | visible=structure.get("visible"), |
| | ) |
| |
|
| | def is_cycle(self) -> bool: |
| | """Is way a cycle way or an area boundary.""" |
| | return self.nodes[0] == self.nodes[-1] |
| |
|
| | def __repr__(self) -> str: |
| | return f"Way <{self.id_}> {self.nodes}" |
| |
|
| |
|
| | @dataclass |
| | class OSMMember: |
| | """ |
| | Member of OpenStreetMap relation. |
| | """ |
| |
|
| | type_: str |
| | ref: int |
| | role: str |
| |
|
| |
|
| | @dataclass(eq=False) |
| | class OSMRelation(OSMElement): |
| | """ |
| | OpenStreetMap relation. |
| | |
| | See https://wiki.openstreetmap.org/wiki/Relation |
| | """ |
| |
|
| | members: Optional[List[OSMMember]] |
| | visible: Optional[str] = None |
| |
|
| | @classmethod |
| | def from_dict(cls, structure: Dict[str, Any]) -> "OSMRelation": |
| | """ |
| | Parse relation from Overpass-like structure. |
| | |
| | :param structure: input structure |
| | """ |
| | return cls( |
| | structure["id"], |
| | structure["tags"], |
| | [OSMMember(x["type"], x["ref"], x["role"]) for x in structure["members"]], |
| | visible=structure.get("visible"), |
| | ) |
| |
|
| |
|
| | class OSMData: |
| | """ |
| | The whole OpenStreetMap information about nodes, ways, and relations. |
| | """ |
| |
|
| | def __init__(self) -> None: |
| | self.nodes: Dict[int, OSMNode] = {} |
| | self.ways: Dict[int, OSMWay] = {} |
| | self.relations: Dict[int, OSMRelation] = {} |
| | self.box: BoundaryBox = None |
| |
|
| | @classmethod |
| | def from_dict(cls, structure: Dict[str, Any]): |
| | data = cls() |
| | bounds = structure.get("bounds") |
| | if bounds is not None: |
| | data.box = BoundaryBox( |
| | np.array([bounds["minlat"], bounds["minlon"]]), |
| | np.array([bounds["maxlat"], bounds["maxlon"]]), |
| | ) |
| |
|
| | for element in structure["elements"]: |
| | if element["type"] == "node": |
| | node = OSMNode.from_dict(element) |
| | data.add_node(node) |
| | for element in structure["elements"]: |
| | if element["type"] == "way": |
| | way = OSMWay.from_dict(element, data.nodes) |
| | data.add_way(way) |
| | for element in structure["elements"]: |
| | if element["type"] == "relation": |
| | relation = OSMRelation.from_dict(element) |
| | data.add_relation(relation) |
| |
|
| | return data |
| |
|
| | @classmethod |
| | def from_json(cls, path: Path): |
| | with path.open(encoding='utf-8') as fid: |
| | structure = json.load(fid) |
| | return cls.from_dict(structure) |
| |
|
| | @classmethod |
| | def from_xml(cls, path: Path): |
| | root = etree.parse(str(path)).getroot() |
| | structure = {"elements": []} |
| | from tqdm import tqdm |
| |
|
| | for elem in tqdm(root): |
| | if elem.tag == "bounds": |
| | structure["bounds"] = { |
| | k: float(elem.attrib[k]) |
| | for k in ("minlon", "minlat", "maxlon", "maxlat") |
| | } |
| | elif elem.tag in {"node", "way", "relation"}: |
| | if elem.tag == "node": |
| | item = { |
| | "id": int(elem.attrib["id"]), |
| | "lat": float(elem.attrib["lat"]), |
| | "lon": float(elem.attrib["lon"]), |
| | "visible": elem.attrib.get("visible"), |
| | "tags": { |
| | x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| | }, |
| | } |
| | elif elem.tag == "way": |
| | item = { |
| | "id": int(elem.attrib["id"]), |
| | "visible": elem.attrib.get("visible"), |
| | "tags": { |
| | x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| | }, |
| | "nodes": [int(x.attrib["ref"]) for x in elem if x.tag == "nd"], |
| | } |
| | elif elem.tag == "relation": |
| | item = { |
| | "id": int(elem.attrib["id"]), |
| | "visible": elem.attrib.get("visible"), |
| | "tags": { |
| | x.attrib["k"]: x.attrib["v"] for x in elem if x.tag == "tag" |
| | }, |
| | "members": [ |
| | { |
| | "type": x.attrib["type"], |
| | "ref": int(x.attrib["ref"]), |
| | "role": x.attrib["role"], |
| | } |
| | for x in elem |
| | if x.tag == "member" |
| | ], |
| | } |
| | item["type"] = elem.tag |
| | structure["elements"].append(item) |
| | elem.clear() |
| | del root |
| | return cls.from_dict(structure) |
| |
|
| | @classmethod |
| | def from_file(cls, path: Path): |
| | ext = path.suffix |
| | if ext == ".json": |
| | return cls.from_json(path) |
| | elif ext in {".osm", ".xml"}: |
| | return cls.from_xml(path) |
| | else: |
| | raise ValueError(f"Unknown extension for {path}") |
| |
|
| | def add_node(self, node: OSMNode): |
| | """Add node and update map parameters.""" |
| | if node.id_ in self.nodes: |
| | raise ValueError(f"Node with duplicate id {node.id_}.") |
| | self.nodes[node.id_] = node |
| |
|
| | def add_way(self, way: OSMWay): |
| | """Add way and update map parameters.""" |
| | if way.id_ in self.ways: |
| | raise ValueError(f"Way with duplicate id {way.id_}.") |
| | self.ways[way.id_] = way |
| |
|
| | def add_relation(self, relation: OSMRelation): |
| | """Add relation and update map parameters.""" |
| | if relation.id_ in self.relations: |
| | raise ValueError(f"Relation with duplicate id {relation.id_}.") |
| | self.relations[relation.id_] = relation |
| |
|
| | def add_xy_to_nodes(self, proj: Projection): |
| | nodes = list(self.nodes.values()) |
| | if len(nodes) == 0: |
| | return |
| | geos = np.stack([n.geo for n in nodes], 0) |
| | if proj.bounds is not None: |
| | |
| | valid = proj.bounds.contains(geos) |
| | if valid.mean() < 0.9: |
| | print("Many nodes are out of the projection bounds.") |
| | xys = np.zeros_like(geos) |
| | xys[valid] = proj.project(geos[valid]) |
| | else: |
| | xys = proj.project(geos) |
| | for xy, node in zip(xys, nodes): |
| | node.xy = xy |
| |
|