Set probe name, public, and location_uuid on NTP metadata before probe_metadata insert.
Uses additional_metadata.geo_override when present (lat/lon/label). Otherwise resolves the remote
host, uses RFC1918/loopback defaults from env, or ip-api.com for public IPs (HTTP, no API key).
Source code in opensampl/helpers/geolocator.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | def create_location(session: Session, geolocate_enabled: bool, ip_address: str, geo_override: dict) -> str | None:
"""
Set probe ``name``, ``public``, and ``location_uuid`` on NTP metadata before ``probe_metadata`` insert.
Uses ``additional_metadata.geo_override`` when present (lat/lon/label). Otherwise resolves the remote
host, uses RFC1918/loopback defaults from env, or ip-api.com for public IPs (HTTP, no API key).
"""
lat: float | None = None
lon: float | None = None
name: str | None = None
if geo_override.get("lat") is not None and geo_override.get("lon") is not None:
lat = float(geo_override["lat"])
lon = float(geo_override["lon"])
name = geo_override.get("name")
if geolocate_enabled and lat is None and lon is None:
ip_for_geo = ip_address
try:
ip_for_geo = socket.gethostbyname(ip_address)
except OSError as e:
logger.debug("Could not resolve {}: {}", ip_address, e)
if _is_private_or_loopback(ip_for_geo):
lat, lon = _default_lab_coords()
else:
geo = _lookup_geo_ipapi(ip_for_geo)
if geo:
lat, lon, _name = geo
name = name or _name
else:
lat, lon = _default_lab_coords()
loc_factory = TableFactory("locations", session=session)
loc = None
if name:
loc = loc_factory.find_existing({"name": name})
if loc is None:
if any(x is None for x in [lat, lon, name]):
logger.warning(
"Skipping location creation for {}: insufficient location data (name={!r}, lat={!r}, lon={!r})",
ip_address,
name,
lat,
lon,
)
return None
loc = loc_factory.write(
{"name": name, "lat": lat, "lon": lon, "public": True},
if_exists="ignore",
)
if loc:
return loc.uuid
return None
|