File size: 6,353 Bytes
2cbfbdf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3aae782
 
 
 
 
 
 
 
 
 
 
 
 
 
b8765fa
3aae782
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cbfbdf
 
28b341e
2cbfbdf
 
5614af4
 
2cbfbdf
 
5614af4
 
2cbfbdf
 
5614af4
2cbfbdf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4aedabe
 
2cbfbdf
 
 
985d3d1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import os
import osmnx as ox
import geopandas as gpd
import gradio as gr
from datetime import datetime
import shutil
import re
import pandas as pd

def abbreviate_address(address):
    abbreviations = {
        "Street": "St", "Avenue": "Ave", "Boulevard": "Blvd", "Court": "Ct", 
        "Drive": "Dr", "Road": "Rd", "Lane": "Ln", "Circle": "Cir", 
        "Parkway": "Pkwy", "Highway": "Hwy", "Square": "Sq", "Terrace": "Ter", 
        "Place": "Pl", "North": "N", "South": "S", "East": "E", "West": "W", 
        "Northeast": "NE", "Northwest": "NW", "Southeast": "SE", "Southwest": "SW", 
        "First": "1st", "Second": "2nd", "Third": "3rd", "Fourth": "4th", "Fifth": "5th",
        "Sixth":"6th", "Seventh":"7th", "Eighth":"8th", "Nineth":"9th",
        "Expressway":"Expy", "Freeway":"Fwy"
    }

    for term, abbr in abbreviations.items():
        if type(address) == list:
            address = address[0]
            address = re.sub(r'\b' + re.escape(term) + r'\b', abbr, address, flags=re.IGNORECASE)
        else:
            address = re.sub(r'\b' + re.escape(term) + r'\b', abbr, address, flags=re.IGNORECASE)
    
    return address

def clean_street_name(name):
    if isinstance(name, list):
        name = name[0]
    # Remove special characters and normalize spaces
    cleaned = re.sub(r'[^\w\s-]', '', str(name))
    return ' '.join(cleaned.split())

def download_road_shapefile(coordinates_upper_left, coordinates_bottom_right):
    north, west = coordinates_upper_left
    south, east = coordinates_bottom_right
    
    current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_folder = f'road_network_{current_time}'

    G = ox.graph_from_bbox(north, south, east, west, network_type='all')
    gdf_edges = ox.graph_to_gdfs(G, nodes=False, edges=True)
    
    # Process road names and numbers
    def process_road_name(row):
        # Safely get values, converting NaN to empty string
        #print(row['name'])
        name = row.get('name', '')
        ref = row.get('ref', '')
        highway = row.get('highway', '')
        
        # Handle lists
        if isinstance(name, list): name = name[0] if name else ''
        if isinstance(ref, list): ref = ref[0] if ref else ''
        if isinstance(highway, list): highway = highway[0] if highway else ''
        
        # Process interstate highways
        if highway == 'motorway' and ref:
            ref = ref.split(';')
            try:
                ref = ref[0]
            except:
                #print('here', ref[0])
                ref = ref[0]

            if ref.startswith('I'):
                #print(ref)
                #print(f"I-{ref.replace('I', '').strip()}")
                return f"I-{ref.replace('I', '').strip()}"
            return ref
            
        # Combine ref and name for other roads
        #print(name, ref, highway)
        if pd.notnull(ref) and name:
            if ref not in str(name):
                print(f"{ref} {name}".strip())
                return f"{ref}/{name}".strip()
        
        return name if name else ref if ref else ''

    # Process road names and numbers
    def process_highway_name(row):
        # Safely get values, converting NaN to empty string
        #print(row['name'])
        #name = row.get('name', '')
        ref = row.get('ref', '')
        highway = row.get('highway', '')
        
        # Handle lists
        #if isinstance(name, list): name = name[0] if name else ''
        if isinstance(ref, list): ref = ref[0] if ref else None
        if isinstance(highway, list): highway = highway[0] if highway else None
        
        # Process interstate highways
        if highway == 'motorway' and type(ref) == str:
            ref = ref.split(';')
            try:
                ref = ref[0]
            except:
                #print('here', ref[0])
                ref = ref[0]

            if ref.startswith('I'):
                #print(ref)
                #print(f"I-{ref.replace('I', '').strip()}")
                return f"I-{ref.replace('I', '').strip()}"
            return ref
            
        
        return ref
    

    gdf_edges = gdf_edges[gdf_edges['name'].notnull()]
    # Apply the processing to create a combined name field
    gdf_edges['highway'] = gdf_edges.apply(process_highway_name, axis = 1)  
    
    # Clean and abbreviate the display names
    gdf_edges['name'] = gdf_edges['name'].apply(abbreviate_address)
    gdf_edges['name'] = gdf_edges['name'].apply(clean_street_name)
    
    # Remove unwanted entries
    gdf_edges = gdf_edges[~gdf_edges['name'].str.contains('Cycletrack|Cycleway', case=False, na=False)]
    gdf_edges = gdf_edges[gdf_edges['name'].str.len() > 0]
    
    # Select final columns
    gdf_edges = gdf_edges[['name', 'highway', 'geometry']]
    gdf_edges = gdf_edges.to_crs(epsg=4326)
    
    # Save the file
    os.makedirs(output_folder, exist_ok=True)
    shapefile_path = os.path.join(output_folder, f'road_network_{current_time}.shp')
    gdf_edges.to_file(shapefile_path, driver='ESRI Shapefile', encoding='utf-8')

    zip_filename = f'{output_folder}.zip'
    shutil.make_archive(output_folder, 'zip', output_folder)

    return zip_filename

def process_coordinates(upper_left, bottom_right):
    try:
        # Parse coordinates
        north, west = map(float, upper_left.split(','))
        south, east = map(float, bottom_right.split(','))
        
        # Validate coordinates logic (north must be greater than south and west must be less than east)
        if north <= south or west >= east:
            return "Invalid bounding box: Upper left corner must be north-west of the lower right corner."
        
        zip_file = download_road_shapefile((north, west), (south, east))
        return zip_file
    except Exception as e:
        return str(e)

# Gradio Interface
iface = gr.Interface(
    fn=process_coordinates,
    inputs=[
        gr.Textbox(label="Upper Left Coordinates (lat, lon)", placeholder="e.g., 37.7749,-122.4194"),
        gr.Textbox(label="Bottom Right Coordinates (lat, lon)", placeholder="e.g., 37.7740,-122.4180")
    ],
    outputs="file",
    title="Download Road Network Shapefile",
    description="Enter the coordinates to download a shapefile of the road network.",
    css="footer {visibility: hidden}"
)

if __name__ == "__main__":
    iface.launch()