Source code for co2calculator.distances

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Functions for obtaining the distance between given addresses."""

import os
import warnings
from pathlib import Path
from typing import Tuple, Union, Optional

import numpy as np
import openrouteservice
from dotenv import load_dotenv
from openrouteservice.directions import directions
from openrouteservice.geocode import pelias_search, pelias_structured
from pydantic import BaseModel, ValidationError, Extra, confloat
from thefuzz import fuzz
from thefuzz import process

from ._types import Kilometer
from .constants import (
    TransportationMode,
    AddressType,
    CountryCode2,
    CountryCode3,
    CountryName,
    IataAirportCode,
    DetourCoefficient,
    DetourConstant,
    RangeCategory,
    RoutingProfile,
)
from .data_handlers import Airports, EUTrainStations

load_dotenv()  # take environment variables from .env.

# Load environment vars (TODO: Use pydantic.BaseSettings)
ORS_API_KEY = os.environ.get("ORS_API_KEY")
# TODO: check if key exists or is valid
script_path = str(Path(__file__).parent)


class StructuredLocation(BaseModel, extra=Extra.forbid):
    address: Optional[str]
    locality: str
    country: Optional[Union[CountryCode2, CountryCode3, CountryName]]
    region: Optional[str]
    county: Optional[str]
    borough: Optional[str]
    postalcode: Optional[str]
    neighbourhood: Optional[str]


class TrainStation(BaseModel, extra=Extra.forbid):
    station_name: str
    country: CountryCode2


class Airport(BaseModel, extra=Extra.forbid):
    iata_code: IataAirportCode


class DistanceRequest(BaseModel):
    transportation_mode: TransportationMode
    start: Union[StructuredLocation, TrainStation, Airport]
    destination: Union[StructuredLocation, TrainStation, Airport]


class Coordinate(BaseModel):
    lat: confloat(ge=-90, le=90)
    long: confloat(ge=-180, le=180)
    lat_rad: confloat(ge=-np.pi / 2, le=np.pi / 2) = None
    long_rad: confloat(ge=-np.pi, le=np.pi) = None

    def deg2rad(self):
        self.lat_rad = np.deg2rad(self.lat)
        self.long_rad = np.deg2rad(self.long)


# Module's exceptions
class InvalidSpatialInput(Exception):
    """Raised when consumer inputs invalid spatial information"""


[docs] def haversine( lat_start: float, long_start: float, lat_dest: float, long_dest: float ) -> Kilometer: """Function to compute the distance as the crow flies between given locations :param lat_start: latitude of start :param long_start: Longitude of start :param lat_dest: Latitude of destination :param long_dest: Longitude of destination :type lat_start: float :type long_start: float :type lat_dest: float :type long_dest: float :return: Distance :rtype: Kilometer """ start = Coordinate(lat=lat_start, long=long_start) dest = Coordinate(lat=lat_dest, long=long_dest) # convert angles from degree to radians start.deg2rad() dest.deg2rad() # compute zeta a = ( np.sin((dest.lat_rad - start.lat_rad) / 2) ** 2 + np.cos(start.lat_rad) * np.cos(dest.lat_rad) * np.sin((dest.long_rad - start.long_rad) / 2) ** 2 ) c = 2 * np.arcsin(np.sqrt(a)) r = 6371 return c * r
def geocoding_airport_pelias( iata: IataAirportCode, ) -> Tuple[str, Tuple[float, float], str]: """Function to obtain the coordinates of an airport by the IATA code :param iata: IATA airport code :type iata: str :return: name, coordinates and country of the found airport :rtype: Tuple[str, Tuple[float, float], str] """ clnt = openrouteservice.Client(key=ORS_API_KEY) call = pelias_search(clnt, f"{iata} Airport") for feature in call["features"]: try: if feature["properties"]["addendum"]["osm"]["iata"] == iata: name = feature["properties"]["name"] geom = feature["geometry"]["coordinates"] country = feature["properties"]["country_a"] break except KeyError: # unfortunately, not all osm tags are available with geocoding, so osm entries might not be found and filter # for "aerodrome" tag not possible (could be done with ORS maybe?) if (feature["properties"]["confidence"] == 1) & ( feature["properties"]["match_type"] == "exact" ): name = feature["properties"]["name"] geom = feature["geometry"]["coordinates"] country = feature["properties"]["country_a"] break return name, geom, country
[docs] def geocoding_airport(iata: IataAirportCode) -> Tuple[str, Tuple[float, float], str]: """Function to obtain the coordinates of an airport by the IATA code :param iata: IATA airport code :type iata: str :return: name, coordinates and country of the found airport :rtype: Tuple[str, Tuple[float, float], str] """ df_airports = Airports().airports airport = Airport(iata_code=iata) name, lat, lon, country = ( df_airports[df_airports.iata_code == airport.iata_code][ ["name", "latitude_deg", "longitude_deg", "iso_country"] ] .values.flatten() .tolist() ) # coords is a string - convert to list of floats coords = [lon, lat] return name, coords, country
def geocoding(address): """Function to obtain coordinates for a given address :param address: Location/Address to be searched user should give address in the form: <address>, <locality>, <country> e.g. Hauptbahnhof, Heidelberg, Germany e.g. Im Neuenheimer Feld 348, Heidelberg, Germany e.g. Heidelberg, Germany :return: Name, country and coordinates of the found location """ clnt = openrouteservice.Client(key=ORS_API_KEY) call = pelias_search(clnt, address) for feature in call["features"]: name = feature["properties"]["name"] country = feature["properties"]["country"] coords = feature["geometry"]["coordinates"] break return name, country, coords
[docs] def geocoding_structured(loc_dict): """Function to obtain coordinates for a given address :param loc_dict: dictionary describing the location. .. code-block:: none The dictionary can have the keys: country: highest-level administrative divisions supported in a search. Full country name or two-/three-letter abbreviations supported e.g., Germany / "DE" / "DEU" region: first-level administrative divisions within countries, analogous to states and provinces in the US and Canada e.g., Delaware, Ontario, Ardennes, Baden-Württemberg county: administrative divisions between localities and regions e.g., Alb-Donau-Kreis locality: equivalent to what are commonly referred to as cities e.g., Bangkok, Caracas borough: mostly known in the context of NY, may exist in other cities like Mexico City e.g. Manhattan in NY, Iztapalapa in Mexico City postalcode: postal code; !! Not working in many countries !! address: street name, optionally also house number neighbourhood: vernacular geographic entities that may not necessarily be official administrative divisions but are important nonetheless e.g. Notting Hill in London, Le Marais in Paris :return: Name, country and coordinates of the found location """ clnt = openrouteservice.Client(key=ORS_API_KEY) location = StructuredLocation(**loc_dict) call = pelias_structured(clnt, **location.dict()) n_results = len(call["features"]) res = call["features"] assert n_results != 0, "No places found with these search parameters" if n_results == 0: raise Exception("No places found with these search parameters") # TODO: Validate response with a pydantic.BaseModel (`PeliasStructuredResponse`) # TODO: Unpack required data from response with a pydantic.BaseModel which we use internally # as Point of Interest (or similar, e.g., `PointOfInterest`) for feature in res: name = feature["properties"]["name"] country = feature["properties"]["country"] coords = feature["geometry"]["coordinates"] layer = feature["properties"]["layer"] if "locality" in loc_dict.keys() and "address" in loc_dict.keys(): if ( layer != "address" and layer != "locality" and layer != "street" ) and n_results > 1: warnings.warn( f"Data type not matching search ({layer} instead of address or locality. Skipping {name}, {coords}", stacklevel=2, ) continue confidence = feature["properties"]["confidence"] if confidence < 0.8: warnings.warn( f"Low confidence: {confidence:.1f} for result {name}, {coords}", stacklevel=2, ) break print( f"{n_results} location(s) found. Using this result: {name}, {country} (data type: {layer})" ) print("Coords: ", coords) # todo: check if to return res or not! return name, country, coords, res
[docs] def geocoding_train_stations(loc_dict): """Function to obtain coordinates for a given train station :param loc_dict: dictionary describing the location. .. code-block:: none The dictionary can have the keys: country: highest-level administrative divisions supported in a search. Only two-letter abbreviations supported e.g., 'DE' station_name: Name of the train station e.g., 'Heidelberg Hbf' :return: Name, country and coordinates of the found location """ eu_train_stations = EUTrainStations().stations station = TrainStation(**loc_dict) country_code = station.country countries_eu = eu_train_stations["country"].unique() if country_code not in countries_eu: warnings.warn( "The provided country is not within Europe. " "Please provide the address of the station instead of the station name for accurate results.", stacklevel=2, ) # filter stations by country stations_in_country = eu_train_stations[ eu_train_stations["country"] == country_code ] # use thefuzz to find best match choices = stations_in_country["slug"].values res_station_slug, score = process.extractOne( station.station_name, choices, scorer=fuzz.partial_ratio ) res_station = stations_in_country[stations_in_country["slug"] == res_station_slug] res_country, res_station_name = res_station[["country", "name"]].values[0] coords = (res_station.iloc[0]["latitude"], res_station.iloc[0]["longitude"]) return res_station_name, res_country, coords
def get_route(coords: list, profile: RoutingProfile = None) -> Kilometer: """Obtain the distance of a route between given waypoints using a given profile todo: check if coords may also be a tuple/array etc. :param coords: list of [lat,long] coordinates :param profile: driving-car, cycling-regular :type coords: list :type profile: str :return: distance of the route :rtype: Kilometer """ clnt = openrouteservice.Client(key=ORS_API_KEY) # profile may be: driving-car, cycling-regular if profile not in [RoutingProfile.CAR, RoutingProfile.CYCLING] or profile is None: profile = RoutingProfile.CAR warnings.warn( f"Warning! Specified profile not available or no profile passed.\n" f"Profile set to '{profile}' by default.", stacklevel=2, ) route = directions(clnt, coords, profile=profile) dist = ( route["routes"][0]["summary"]["distance"] / 1000 ) # divide by 1000, as we're working with distances in km return dist def get_route_ferry( coords: list, profile: RoutingProfile = None ) -> Tuple[Kilometer, Kilometer]: """Obtain the distance of a ferry route (and the total trip distance) between given waypoints todo: check if coords may also be a tuple/array etc. :param coords: list of [lat,long] coordinates :param profile: driving-car, foot-walking :type coords: list :type profile: str :return: distance of the ferry crossing, total distance :rtype: Kilometer, Kilometer """ # profile may be: driving-car, walking clnt = openrouteservice.Client(key=ORS_API_KEY) if profile not in [RoutingProfile.WALK, RoutingProfile.CAR] or profile is None: profile = RoutingProfile.WALK warnings.warn( f"Warning! Specified profile not available or no profile passed.\n" f"Profile set to '{profile}' by default.", stacklevel=2, ) res = directions(clnt, coords, profile=profile, extra_info=["waytype"]) """waytypes = {0: "Unknown", 1: "State Road", 2: "Road", 3: "Street", 4: "Path", 5: "Track", 6: "Cycleway", 7: "Footway", 8: "Steps", 9: "Ferry", 10: "Construction"}""" dist_per_waytype = res["routes"][0]["extras"]["waytypes"]["summary"] try: dist_ferry = [d["distance"] for d in dist_per_waytype if d["value"] == 9.0][ 0 ] / 1000 except IndexError: # todo: raise Error raise InvalidSpatialInput( "The generated route does not contain any ferry crossing. Are you sure about the waypoints?" ) dist_ferry = 0.0 total_dist = ( res["routes"][0]["summary"]["distance"] / 1000 ) # divide by 1000, as we're working with distances in km if (dist_ferry + 100) < total_dist and dist_ferry != 0.0: warnings.warn( """ "Total distance is much larger than ferry crossing. Your ferry route might not be contained in the database. If you are sure you entered the correct addresses of the ferry ports, consider entering the approximate ferry trip distance directly instead of the port addresses. """, UserWarning, stacklevel=2, ) return dist_ferry, total_dist def _apply_detour( distance: Kilometer, transportation_mode: TransportationMode ) -> Kilometer: """Function to apply specific detour parameters to a distance as the crow flies :param distance: Distance as the crow flies between location of departure and destination of a trip :param transportation_mode: Mode of transport used in the trip :type distance: Kilometer :type transportation_mode: str :return: Distance accounted for detour :rtype: Kilometer """ try: detour_coefficient = DetourCoefficient[transportation_mode.upper()] detour_constant = DetourConstant[transportation_mode.upper()] except KeyError: detour_coefficient = 1.0 detour_constant = 0.0 warnings.warn( f""" No detour coefficient or constant available for this transportation mode. Detour parameters are available for the following transportation modes: {list(DetourCoefficient)} Using detour_coefficient = {detour_coefficient} and detour_constant = {detour_constant}. """, stacklevel=2, ) distance_with_detour = detour_coefficient * distance + detour_constant return distance_with_detour def range_categories(distance: Kilometer) -> Tuple[RangeCategory, str]: """Function to categorize a trip according to the travelled distance :param distance: Distance travelled in km :type distance: Kilometer :return: Range category of the trip [very short haul, short haul, medium haul, long haul] Range description (i.e., what range of distances does to category correspond to) :rtype: tuple[RangeCategory, str] """ if distance < 0: raise ValueError("Distance must not be negative!") elif distance <= 500: range_cat = RangeCategory.VERY_SHORT_HAUL range_description = "below 500 km" elif distance <= 1500: range_cat = RangeCategory.SHORT_HAUL range_description = "500 to 1500 km" elif distance <= 4000: range_cat = RangeCategory.MEDIUM_HAUL range_description = "1500 to 4000 km" else: range_cat = RangeCategory.LONG_HAUL range_description = "above 4000 km" return range_cat, range_description def create_distance_request( start: dict, destination: dict, transportation_mode: TransportationMode, ) -> DistanceRequest: """Transform and validate the user input into a proper model for distance calculations Raises: - InvalidSpatialInput for validation error - InvalidSpatialInput for unknown transportation_mode :param start: Start of the trip :param destination: Destination of the trip :param transportation_mode: Mode of transport used in the trip :type start: Union[str, dict] :type destination: Union[str, dict] :type transportation_mode: TransportationMode :return: Request for distance calculation between two locations for a given transportation mode :rtype: DistanceRequest """ # Validate the spatial data wrt. the mode of transportation # And creates a DistanceRequest object containing either StructuredLocation, # TrainStation or Airport objects based on the user input address_type # for start and destination, if nothing is given assume StructuredLocation try: locations = [None, None] for i, o in enumerate([start, destination]): if isinstance(o, dict): if "address_type" in o: if o["address_type"] == AddressType.ADDRESS: del o["address_type"] locations[i] = StructuredLocation(**o) elif o["address_type"] == AddressType.TRAINSTATION: del o["address_type"] locations[i] = TrainStation(**o) elif o["address_type"] == AddressType.AIRPORT: del o["address_type"] locations[i] = Airport(iata_code=o["IATA"]) else: raise InvalidSpatialInput( f"unknown address type: '{o['address_type']}'" ) else: print( "No address type provided: ('address', 'trainstation' ,'airport'), assume address" ) locations[i] = StructuredLocation(**o) elif isinstance(o, str): locations[i] = StructuredLocation(locality=o) else: raise InvalidSpatialInput( f"start and destination must be either dict or string, not {type(o)}" ) return DistanceRequest( transportation_mode=transportation_mode, start=locations[0], destination=locations[1], ) except ValidationError as e: raise InvalidSpatialInput(e) def get_distance(request: DistanceRequest) -> Kilometer: """Get the distance between start and destination Raises: - InvalidSpatialInput if start and stop are malformed or None :param request: Request for distance calculation between two locations for a given transportation mode :type request: DistanceRequest :return: Distance :rtype: Kilometer """ # Calculate cords coords = [] # StructuredLocation, TrainStation, Airport based on the object class of # start and destination in the request for loc in [request.start, request.destination]: if isinstance(loc, StructuredLocation): _, _, loc_coords, _ = geocoding_structured(loc.dict()) elif isinstance(loc, TrainStation): _, _, loc_coords = geocoding_train_stations(loc.dict()) elif isinstance(loc, Airport): _, loc_coords, _ = geocoding_airport(loc.iata_code) else: raise Exception("Address Type not valid") coords.append(loc_coords) # TODO: Do we want to calculate the distance for bicycles and pedelecs same like for cars? if request.transportation_mode in [ TransportationMode.CAR, TransportationMode.MOTORBIKE, TransportationMode.PEDELEC, TransportationMode.BICYCLE, ]: return get_route(coords, RoutingProfile.CAR) elif request.transportation_mode in [ TransportationMode.TRAIN, TransportationMode.TRAM, TransportationMode.BUS, TransportationMode.PLANE, ]: distance = 0 # This could also be used for longer lists of cords (with via) for i in range(len(coords) - 1): # compute great circle distance between locations distance += haversine( coords[i][1], coords[i][0], coords[i + 1][1], coords[i + 1][0] ) return _apply_detour(distance, request.transportation_mode) if request.transportation_mode == TransportationMode.FERRY: # hardcoding not ideal, profile should be determined based on specified "seating type" distance, distance_total = get_route_ferry(coords, profile=RoutingProfile.WALK) # if "seating" is "Car passenger", the remaining distance should be calculated as car trip ... # TODO: implement this # remaining_distance = distance - distance_total return distance