diff --git a/mapillary_tools/geotag/utils.py b/mapillary_tools/geotag/utils.py index 7be6debf..fcb854a4 100644 --- a/mapillary_tools/geotag/utils.py +++ b/mapillary_tools/geotag/utils.py @@ -12,11 +12,15 @@ import gpxpy -from .. import exiftool_read, geo, utils +from .. import exiftool_read, geo, telemetry, utils Track = T.List[geo.Point] LOG = logging.getLogger(__name__) +# GPS epoch start: January 6, 1980 (Unix timestamp). +# Any timestamp below this is not a valid GPS time. +_MIN_GPS_EPOCH_TIME = 315964800.0 + def parse_gpx(gpx_file: Path) -> list[Track]: with gpx_file.open("r") as f: @@ -29,15 +33,35 @@ def parse_gpx(gpx_file: Path) -> list[Track]: tracks.append([]) for point in segment.points: if point.time is not None: - tracks[-1].append( - geo.Point( - time=geo.as_unix_time(point.time), - lat=point.latitude, - lon=point.longitude, - alt=point.elevation, - angle=None, + unix_time = geo.as_unix_time(point.time) + if unix_time >= _MIN_GPS_EPOCH_TIME: + tracks[-1].append( + telemetry.CAMMGPSPoint( + time=unix_time, + lat=point.latitude, + lon=point.longitude, + alt=point.elevation, + angle=None, + time_gps_epoch=unix_time, + gps_fix_type=3 if point.elevation is not None else 2, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + ) + else: + tracks[-1].append( + geo.Point( + time=unix_time, + lat=point.latitude, + lon=point.longitude, + alt=point.elevation, + angle=None, + ) ) - ) return tracks diff --git a/mapillary_tools/serializer/description.py b/mapillary_tools/serializer/description.py index 9f50611b..d2fa88e6 100644 --- a/mapillary_tools/serializer/description.py +++ b/mapillary_tools/serializer/description.py @@ -25,7 +25,7 @@ import jsonschema -from .. import exceptions, geo +from .. import exceptions, geo, telemetry from ..types import ( BaseSerializer, describe_error_metadata, @@ -196,6 +196,10 @@ class ErrorDescription(TypedDict, total=False): "type": ["number", "null"], "description": "Camera angle of the track point, in degrees. If null, the angle will be interpolated", }, + { + "type": ["number", "null"], + "description": "GPS epoch time of the track point, in seconds. If present, used as the authoritative timestamp", + }, ], }, }, @@ -509,18 +513,45 @@ def _from_video_desc(cls, desc: VideoDescription) -> VideoMetadata: class PointEncoder: @classmethod def encode(cls, p: geo.Point) -> T.Sequence[float | int | None]: - entry = [ + entry: list[float | int | None] = [ int(p.time * 1000), round(p.lon, _COORDINATES_PRECISION), round(p.lat, _COORDINATES_PRECISION), round(p.alt, _ALTITUDE_PRECISION) if p.alt is not None else None, round(p.angle, _ANGLE_PRECISION) if p.angle is not None else None, ] + gps_epoch_time = p.get_gps_epoch_time() + if gps_epoch_time is not None: + entry.append(gps_epoch_time) return entry @classmethod def decode(cls, entry: T.Sequence[T.Any]) -> geo.Point: - time_ms, lon, lat, alt, angle = entry + if len(entry) >= 6 and entry[5] is not None: + time_ms, lon, lat, alt, angle, time_gps_epoch = ( + entry[0], + entry[1], + entry[2], + entry[3], + entry[4], + entry[5], + ) + return telemetry.CAMMGPSPoint( + time=time_ms / 1000, + lat=lat, + lon=lon, + alt=alt, + angle=angle, + time_gps_epoch=time_gps_epoch, + gps_fix_type=3 if alt is not None else 2, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + time_ms, lon, lat, alt, angle = entry[0], entry[1], entry[2], entry[3], entry[4] return geo.Point(time=time_ms / 1000, lon=lon, lat=lat, alt=alt, angle=angle) diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index b4866707..f514229b 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -302,10 +302,33 @@ def prepare_camm_info( camm_info.gps.append(point) elif isinstance(point, telemetry.GPSPoint): - # There is no proper CAMM entry for GoPro GPS - if camm_info.mini_gps is None: - camm_info.mini_gps = [] - camm_info.mini_gps.append(point) + # Convert GPSPoint to CAMMGPSPoint if it has a valid epoch_time, + # so the GPS timestamp is preserved in the CAMM type 6 entry + if point.epoch_time is not None and point.epoch_time > 0: + camm_point = telemetry.CAMMGPSPoint( + time=point.time, + lat=point.lat, + lon=point.lon, + alt=point.alt, + angle=point.angle, + time_gps_epoch=point.epoch_time, + gps_fix_type=point.fix.value + if point.fix is not None + else (3 if point.alt is not None else 2), + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ) + if camm_info.gps is None: + camm_info.gps = [] + camm_info.gps.append(camm_point) + else: + if camm_info.mini_gps is None: + camm_info.mini_gps = [] + camm_info.mini_gps.append(point) elif isinstance(point, geo.Point): if camm_info.mini_gps is None: diff --git a/schema/image_description_schema.json b/schema/image_description_schema.json index 2172036f..ce294b4a 100644 --- a/schema/image_description_schema.json +++ b/schema/image_description_schema.json @@ -34,6 +34,13 @@ "null" ], "description": "Camera angle of the track point, in degrees. If null, the angle will be interpolated" + }, + { + "type": [ + "number", + "null" + ], + "description": "GPS epoch time of the track point, in seconds. If present, used as the authoritative timestamp" } ] } diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index 40076aa6..5f50b26c 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -351,3 +351,311 @@ def test_build_and_parse3(): ) x = encode_decode_empty_camm_mp4(metadata) assert [] == x.points + + +def test_build_and_parse_gpx_sourced_camm_gps_points(): + """Test CAMMGPSPoint objects as created from GPX data (zeroed accuracy/velocity).""" + points = [ + telemetry.CAMMGPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + time_gps_epoch=1706000000.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + telemetry.CAMMGPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + time_gps_epoch=1706000001.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + telemetry.CAMMGPSPoint( + time=2.0, + lat=37.7751, + lon=-122.4196, + alt=12.0, + angle=None, + time_gps_epoch=1706000002.0, + gps_fix_type=3, + horizontal_accuracy=0.0, + vertical_accuracy=0.0, + velocity_east=0.0, + velocity_north=0.0, + velocity_up=0.0, + speed_accuracy=0.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.CAMM, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + # Verify points round-trip with time_gps_epoch preserved + assert len(x.points) == 3 + for original, decoded in zip(points, x.points): + assert isinstance(decoded, telemetry.CAMMGPSPoint) + decoded_camm = T.cast(telemetry.CAMMGPSPoint, decoded) + assert abs(original.time_gps_epoch - decoded_camm.time_gps_epoch) < 10e-6 + assert abs(original.lat - decoded_camm.lat) < 10e-6 + assert abs(original.lon - decoded_camm.lon) < 10e-6 + + +def test_prepare_camm_info_gpspoint_with_epoch_time(): + """GPSPoint with valid epoch_time should be converted to CAMMGPSPoint and routed to camm_info.gps.""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_2D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # Should be routed to gps (type 6), not mini_gps (type 5) + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + assert camm_info.mini_gps is None + + # Verify conversion preserved fields + for original, converted in zip(points, camm_info.gps): + assert isinstance(converted, telemetry.CAMMGPSPoint) + assert converted.lat == original.lat + assert converted.lon == original.lon + assert converted.alt == original.alt + assert converted.time == original.time + assert converted.time_gps_epoch == original.epoch_time + + # Verify fix type was correctly converted from GPSFix enum + assert camm_info.gps[0].gps_fix_type == 3 # FIX_3D.value + assert camm_info.gps[1].gps_fix_type == 2 # FIX_2D.value + + +def test_prepare_camm_info_gpspoint_without_epoch_time(): + """GPSPoint without epoch_time should remain in mini_gps (type 5).""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=None, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=0, + fix=telemetry.GPSFix.FIX_2D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # Should stay in mini_gps (type 5) + assert camm_info.gps is None + assert camm_info.mini_gps is not None + assert len(camm_info.mini_gps) == 2 + + +def test_prepare_camm_info_gpspoint_no_fix(): + """GPSPoint with epoch_time but fix=None should infer fix type from altitude.""" + point_with_alt = telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=None, + precision=None, + ground_speed=None, + ) + point_without_alt = telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=None, + angle=None, + epoch_time=1706000001.0, + fix=None, + precision=None, + ground_speed=None, + ) + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=[point_with_alt, point_without_alt], + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + # With altitude -> 3D fix + assert camm_info.gps[0].gps_fix_type == 3 + # Without altitude -> 2D fix + assert camm_info.gps[1].gps_fix_type == 2 + + +def test_prepare_camm_info_mixed_point_types(): + """Test that mixed point types are correctly routed.""" + points: T.List[geo.Point] = [ + # CAMMGPSPoint -> gps + telemetry.CAMMGPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + time_gps_epoch=1706000000.0, + gps_fix_type=3, + horizontal_accuracy=1.0, + vertical_accuracy=2.0, + velocity_east=0.1, + velocity_north=0.2, + velocity_up=0.3, + speed_accuracy=0.5, + ), + # GPSPoint with epoch_time -> converted to CAMMGPSPoint -> gps + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + # GPSPoint without epoch_time -> mini_gps + telemetry.GPSPoint( + time=2.0, + lat=37.7751, + lon=-122.4196, + alt=12.0, + angle=None, + epoch_time=None, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + # geo.Point -> mini_gps + geo.Point( + time=3.0, + lat=37.7752, + lon=-122.4197, + alt=13.0, + angle=None, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.CAMM, + points=points, + ) + camm_info = uploader.VideoUploader.prepare_camm_info(metadata) + + # 2 points in gps (CAMMGPSPoint + converted GPSPoint) + assert camm_info.gps is not None + assert len(camm_info.gps) == 2 + assert camm_info.gps[0].time_gps_epoch == 1706000000.0 + assert camm_info.gps[1].time_gps_epoch == 1706000001.0 + + # 2 points in mini_gps (GPSPoint without epoch + geo.Point) + assert camm_info.mini_gps is not None + assert len(camm_info.mini_gps) == 2 + + +def test_prepare_camm_info_gpspoint_roundtrip(): + """GPSPoint with epoch_time should round-trip through CAMM encode/decode with timestamp preserved.""" + points = [ + telemetry.GPSPoint( + time=0.0, + lat=37.7749, + lon=-122.4194, + alt=10.0, + angle=None, + epoch_time=1706000000.0, + fix=telemetry.GPSFix.FIX_3D, + precision=1.5, + ground_speed=5.0, + ), + telemetry.GPSPoint( + time=1.0, + lat=37.7750, + lon=-122.4195, + alt=11.0, + angle=None, + epoch_time=1706000001.0, + fix=telemetry.GPSFix.FIX_3D, + precision=2.0, + ground_speed=6.0, + ), + ] + metadata = types.VideoMetadata( + Path(""), + filetype=types.FileType.GOPRO, + points=points, + ) + x = encode_decode_empty_camm_mp4(metadata) + + # Should come back as CAMMGPSPoint with epoch time preserved + assert len(x.points) == 2 + for original, decoded in zip(points, x.points): + assert isinstance(decoded, telemetry.CAMMGPSPoint) + decoded_camm = T.cast(telemetry.CAMMGPSPoint, decoded) + assert abs(original.epoch_time - decoded_camm.time_gps_epoch) < 10e-6 + assert abs(original.lat - decoded_camm.lat) < 10e-6 + assert abs(original.lon - decoded_camm.lon) < 10e-6