Spaces:
Running
Running
import time | |
from typing import Dict, Optional, List | |
import undetected_chromedriver as uc | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from config import MAPCRUNCH_URL, SELECTORS, DATA_COLLECTION_CONFIG | |
class MapCrunchController: | |
def __init__(self, headless: bool = False): | |
# Try to initialize ChromeDriver with version 137 (your current Chrome version) | |
try: | |
# Create fresh ChromeOptions for first attempt | |
options = uc.ChromeOptions() | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--disable-gpu") | |
options.add_argument("--window-size=1920,1080") | |
options.add_argument("--disable-web-security") | |
options.add_argument("--disable-features=VizDisplayCompositor") | |
options.add_argument("--disable-blink-features=AutomationControlled") | |
if headless: | |
options.add_argument("--headless=new") | |
self.driver = uc.Chrome(options=options, use_subprocess=True, version_main=137) | |
print("β ChromeDriver initialized successfully with version 137") | |
except Exception as e: | |
print(f"Failed with version 137: {e}") | |
try: | |
# Create fresh ChromeOptions for fallback attempt | |
options = uc.ChromeOptions() | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--disable-gpu") | |
options.add_argument("--window-size=1920,1080") | |
options.add_argument("--disable-web-security") | |
options.add_argument("--disable-features=VizDisplayCompositor") | |
options.add_argument("--disable-blink-features=AutomationControlled") | |
if headless: | |
options.add_argument("--headless=new") | |
# Fallback to auto-detection | |
self.driver = uc.Chrome(options=options, use_subprocess=True) | |
print("β ChromeDriver initialized successfully with auto-detection") | |
except Exception as e2: | |
print(f"Failed with auto-detection: {e2}") | |
raise Exception(f"Could not initialize ChromeDriver. Please update Chrome or check compatibility. Errors: {e}, {e2}") | |
self.wait = WebDriverWait(self.driver, 10) | |
# Inject browser detection bypass script | |
try: | |
self.driver.execute_cdp_cmd( | |
"Page.addScriptToEvaluateOnNewDocument", | |
{ | |
"source": """ | |
Object.defineProperty(window, 'badBrowser', { | |
value: 0, | |
writable: false, | |
configurable: false | |
}); | |
window.alert = function() {}; | |
Object.defineProperty(navigator, 'webdriver', { | |
get: () => undefined | |
}); | |
""" | |
}, | |
) | |
except Exception as e: | |
print(f"Warning: Could not inject browser detection script: {e}") | |
# Load MapCrunch | |
for retry in range(3): | |
try: | |
self.driver.get(MAPCRUNCH_URL) | |
time.sleep(3) | |
print("β MapCrunch loaded successfully") | |
break | |
except Exception as e: | |
if retry == 2: | |
raise e | |
print(f"Failed to load MapCrunch, retry {retry + 1}/3: {e}") | |
time.sleep(2) | |
def setup_clean_environment(self): | |
""" | |
Minimal environment setup using hideLoc() and hiding major UI. | |
""" | |
self.driver.execute_script("if(typeof hideLoc === 'function') hideLoc();") | |
self.driver.execute_script(""" | |
const menu = document.querySelector('#menu'); | |
if (menu) menu.style.display = 'none'; | |
const social = document.querySelector('#social'); | |
if (social) social.style.display = 'none'; | |
const googleImg = document.querySelector('img[alt="Google"]'); | |
if (googleImg && googleImg.parentElement) { | |
googleImg.parentElement.style.display = 'none'; | |
} | |
const topBar = document.querySelector('#topbar'); | |
if (topBar) topBar.style.display = 'none'; | |
const bottomBox = document.querySelector('#bottom-box'); | |
if (bottomBox) bottomBox.style.display = 'none'; | |
const infoFirstView = document.querySelector('#info-firstview'); | |
if (infoFirstView) infoFirstView.style.display = 'none'; | |
const controlsToHide = document.querySelectorAll('.gm-style-cc'); controlsToHide.forEach(el => { el.style.display = 'none'; }); | |
const keyboardButton = document.querySelector('button[aria-label="Keyboard shortcuts"]'); if (keyboardButton) { keyboardButton.style.display = 'none'; } | |
""") | |
def label_arrows_on_screen(self): | |
"""Overlays 'UP' and 'DOWN' labels on the navigation arrows.""" | |
try: | |
pov = self.driver.execute_script("return window.panorama.getPov();") | |
links = self.driver.execute_script("return window.panorama.getLinks();") | |
except Exception: | |
return | |
if not links or not pov: | |
return | |
current_heading = pov["heading"] | |
forward_link = None | |
backward_link = None | |
# This logic is identical to your existing `move` function | |
# to ensure stylistic and behavioral consistency. | |
min_forward_diff = 360 | |
for link in links: | |
diff = 180 - abs(abs(link["heading"] - current_heading) - 180) | |
if diff < min_forward_diff: | |
min_forward_diff = diff | |
forward_link = link | |
target_backward_heading = (current_heading + 180) % 360 | |
min_backward_diff = 360 | |
for link in links: | |
diff = 180 - abs(abs(link["heading"] - target_backward_heading) - 180) | |
if diff < min_backward_diff: | |
min_backward_diff = diff | |
backward_link = link | |
js_script = """ | |
document.querySelectorAll('.geobot-arrow-label').forEach(el => el.remove()); | |
document.querySelectorAll('path[data-geobot-modified]').forEach(arrow => { | |
arrow.setAttribute('transform', arrow.getAttribute('data-original-transform') || ''); | |
arrow.removeAttribute('data-geobot-modified'); | |
arrow.removeAttribute('data-original-transform'); | |
}); | |
const modifyAndLabelArrow = (panoId, labelText, color) => { | |
const arrowElement = document.querySelector(`path[pano="${panoId}"]`); | |
if (!arrowElement) return; | |
const originalTransform = arrowElement.getAttribute('transform') || ''; | |
arrowElement.setAttribute('data-original-transform', originalTransform); | |
arrowElement.setAttribute('transform', `${originalTransform} scale(1.8)`); | |
arrowElement.setAttribute('data-geobot-modified', 'true'); | |
const rect = arrowElement.getBoundingClientRect(); | |
const label = document.createElement('div'); | |
label.className = 'geobot-arrow-label'; | |
label.style.position = 'fixed'; | |
label.style.left = `${rect.left + rect.width / 2}px`; | |
label.style.top = `${rect.top - 45}px`; | |
label.style.transform = 'translateX(-50%)'; | |
label.style.padding = '5px 15px'; | |
label.style.backgroundColor = 'rgba(0, 0, 0, 0.8)'; | |
label.style.color = color; | |
label.style.borderRadius = '8px'; | |
label.style.fontSize = '28px'; | |
label.style.fontWeight = 'bold'; | |
label.style.zIndex = '99999'; | |
label.style.pointerEvents = 'none'; | |
label.innerText = labelText; | |
document.body.appendChild(label); | |
}; | |
const forwardPano = arguments[0]; | |
const backwardPano = arguments[1]; | |
if (forwardPano) { | |
modifyAndLabelArrow(forwardPano, 'UP', '#76FF03'); | |
} | |
if (backwardPano && backwardPano !== forwardPano) { | |
modifyAndLabelArrow(backwardPano, 'DOWN', '#F44336'); | |
} | |
""" | |
forward_pano = forward_link["pano"] if forward_link else None | |
backward_pano = backward_link["pano"] if backward_link else None | |
self.driver.execute_script(js_script, forward_pano, backward_pano) | |
time.sleep(0.2) | |
def get_available_actions(self) -> List[str]: | |
""" | |
Checks for movement links via JavaScript. | |
""" | |
base_actions = ["PAN_LEFT", "PAN_RIGHT", "GUESS"] | |
links = self.driver.execute_script("return window.panorama.getLinks();") | |
if links and len(links) > 0: | |
base_actions.extend(["MOVE_FORWARD", "MOVE_BACKWARD"]) | |
return base_actions | |
def get_current_address(self) -> Optional[str]: | |
try: | |
address_element = self.wait.until( | |
EC.visibility_of_element_located( | |
(By.CSS_SELECTOR, SELECTORS["address_element"]) | |
) | |
) | |
address_text = address_element.text.strip() | |
address_title = address_element.get_attribute("title") or "" | |
return ( | |
address_title | |
if len(address_title) > len(address_text) | |
else address_text | |
) | |
except Exception: | |
return "Stealth Mode" | |
def pan_view(self, direction: str, degrees: int = 45): | |
"""Pans the view using a direct JS call.""" | |
pov = self.driver.execute_script("return window.panorama.getPov();") | |
if direction == "left": | |
pov["heading"] -= degrees | |
elif direction == "right": | |
pov["heading"] += degrees | |
self.driver.execute_script("window.panorama.setPov(arguments[0]);", pov) | |
time.sleep(0.5) | |
def move(self, direction: str): | |
"""Moves by finding the best panorama link and setting it via JS.""" | |
pov = self.driver.execute_script("return window.panorama.getPov();") | |
links = self.driver.execute_script("return window.panorama.getLinks();") | |
if not links: | |
return | |
current_heading = pov["heading"] | |
best_link = None | |
if direction == "forward": | |
min_diff = 360 | |
for link in links: | |
diff = 180 - abs(abs(link["heading"] - current_heading) - 180) | |
if diff < min_diff: | |
min_diff = diff | |
best_link = link | |
elif direction == "backward": | |
target_heading = (current_heading + 180) % 360 | |
min_diff = 360 | |
for link in links: | |
diff = 180 - abs(abs(link["heading"] - target_heading) - 180) | |
if diff < min_diff: | |
min_diff = diff | |
best_link = link | |
if best_link: | |
self.driver.execute_script( | |
"window.panorama.setPano(arguments[0]);", best_link["pano"] | |
) | |
time.sleep(2.5) | |
def select_map_location_and_guess(self, lat: float, lon: float): | |
"""Minimalist guess confirmation.""" | |
self.driver.execute_script( | |
"document.querySelector('#bottom-box').style.display = 'block';" | |
) | |
self.wait.until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["go_button"])) | |
).click() | |
time.sleep(0.5) | |
self.wait.until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["confirm_button"])) | |
).click() | |
time.sleep(3) | |
def get_ground_truth_location(self) -> Optional[Dict[str, float]]: | |
"""Directly gets location from JS object.""" | |
return self.driver.execute_script("return window.loc;") | |
def click_go_button(self) -> bool: | |
self.wait.until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["go_button"])) | |
).click() | |
time.sleep(DATA_COLLECTION_CONFIG.get("wait_after_go", 3)) | |
return True | |
def take_street_view_screenshot(self) -> Optional[bytes]: | |
pano_element = self.wait.until( | |
EC.presence_of_element_located( | |
(By.CSS_SELECTOR, SELECTORS["pano_container"]) | |
) | |
) | |
return pano_element.screenshot_as_png | |
def load_location_from_data(self, location_data: Dict) -> bool: | |
pano_id, pov = location_data.get("pano_id"), location_data.get("pov") | |
if pano_id and pov: | |
self.driver.execute_script( | |
"window.panorama.setPano(arguments[0]); window.panorama.setPov(arguments[1]);", | |
pano_id, | |
pov, | |
) | |
time.sleep(2) | |
return True | |
return False | |
def close(self): | |
if self.driver: | |
self.driver.quit() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.close() | |
def load_url(self, url): | |
"""Load a specific MapCrunch URL.""" | |
try: | |
self.driver.get(url) | |
time.sleep(2) # Wait for the page to load | |
return True | |
except Exception as e: | |
print(f"Error loading URL: {e}") | |
return False | |