Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 120 additions & 16 deletions mapillary_tools/blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import dataclasses
import datetime
import json
import logging
import re
Expand Down Expand Up @@ -120,25 +121,64 @@ def _extract_camera_model_from_cprt(cprt_bytes: bytes) -> str:
return ""


def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
def _compute_timezone_offset_from_rmc(
epoch_sec: float, message: pynmea2.NMEASentence
) -> float | None:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
[GPSPoint(time=1623057074.211, lat=51.150436666666664, lon=-114.03067833333333, alt=1097.36, angle=None, epoch_time=1623057074.211, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]
Compute timezone offset from an RMC message which has full date+time.

>>> list(_parse_gps_box(b"[1629874404069]$GNGGA,175322.00,3244.53126,N,11710.97811,W,1,12,0.84,17.4,M,-34.0,M,,*45"))
[GPSPoint(time=1629874404.069, lat=32.742187666666666, lon=-117.1829685, alt=17.4, angle=None, epoch_time=1629874404.069, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]
Returns the offset to add to camera epoch to get correct UTC time,
or None if this message doesn't have the required datetime.
"""
if (
message.sentence_type != "RMC"
or not hasattr(message, "datetime")
or not message.datetime
):
return None

>>> list(_parse_gps_box(b"[1629874404069]$GNGLL,4404.14012,N,12118.85993,W,001037.00,A,A*67"))
[GPSPoint(time=1629874404.069, lat=44.069002, lon=-121.31433216666667, alt=None, angle=None, epoch_time=1629874404.069, fix=None, precision=None, ground_speed=None)]
correct_epoch = message.datetime.replace(tzinfo=datetime.timezone.utc).timestamp()
# Rounding needed to avoid floating point precision issues
return round(correct_epoch - epoch_sec, 3)

>>> list(_parse_gps_box(b"[1629874404069]$GNRMC,001031.00,A,4404.13993,N,12118.86023,W,0.146,,100117,,,A*7B"))
[GPSPoint(time=1629874404.069, lat=44.06899883333333, lon=-121.31433716666666, alt=None, angle=None, epoch_time=1629874404.069, fix=None, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
def _compute_timezone_offset_from_time_only(
epoch_sec: float, message: pynmea2.NMEASentence
) -> float | None:
"""
points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}
Compute timezone offset from GGA/GLL which only have time (no date).

Uses the date from camera epoch and replaces the time with NMEA time assuming camera date is correct.
Handles day boundary when camera and GPS times differ by more than 12 hours.
"""
if not hasattr(message, "timestamp") or not message.timestamp:
return None

camera_dt = datetime.datetime.fromtimestamp(epoch_sec, tz=datetime.timezone.utc)

nmea_time = message.timestamp
corrected_dt = camera_dt.replace(
hour=nmea_time.hour,
minute=nmea_time.minute,
second=nmea_time.second,
microsecond=getattr(nmea_time, "microsecond", 0),
)
# Handle day boundary e.g. camera time is 23:00, GPS time is 01:00 or vice versa
camera_secs = camera_dt.hour * 3600 + camera_dt.minute * 60 + camera_dt.second
nmea_secs = nmea_time.hour * 3600 + nmea_time.minute * 60 + nmea_time.second
if camera_secs - nmea_secs > 12 * 3600:
corrected_dt += datetime.timedelta(days=1)
elif nmea_secs - camera_secs > 12 * 3600:
corrected_dt -= datetime.timedelta(days=1)

# Rounding needed to avoid floating point precision issues
return round(corrected_dt.timestamp() - epoch_sec, 3)


def _parse_nmea_lines(
gps_data: bytes,
) -> T.Iterator[tuple[int, pynmea2.NMEASentence]]:
"""Parse NMEA lines from GPS data, yielding (epoch_ms, message) tuples."""
for line_bytes in gps_data.splitlines():
match = NMEA_LINE_REGEX.match(line_bytes)
if match is None:
Expand All @@ -162,18 +202,82 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
continue

epoch_ms = int(match.group(1))
yield epoch_ms, message


def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
[GPSPoint(time=1623097530.0, lat=51.150436666666664, lon=-114.03067833333333, alt=1097.36, angle=None, epoch_time=1623097530.0, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNGGA,175322.00,3244.53126,N,11710.97811,W,1,12,0.84,17.4,M,-34.0,M,,*45"))
[GPSPoint(time=1629914002.0, lat=32.742187666666666, lon=-117.1829685, alt=17.4, angle=None, epoch_time=1629914002.0, fix=<GPSFix.FIX_3D: 3>, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNGLL,4404.14012,N,12118.85993,W,001037.00,A,A*67"))
[GPSPoint(time=1629850237.0, lat=44.069002, lon=-121.31433216666667, alt=None, angle=None, epoch_time=1629850237.0, fix=None, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1629874404069]$GNRMC,001031.00,A,4404.13993,N,12118.86023,W,0.146,,100117,,,A*7B"))
[GPSPoint(time=1484007031.0, lat=44.06899883333333, lon=-121.31433716666666, alt=None, angle=None, epoch_time=1484007031.0, fix=None, precision=None, ground_speed=None)]

>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
"""
timezone_offset: float | None = None
parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = []
first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None

# First pass: collect parsed_lines and compute timezone offset from the first valid RMC message
for epoch_ms, message in _parse_nmea_lines(gps_data):
# Rounding needed to avoid floating point precision issues
epoch_sec = round(epoch_ms / 1000, 3)
parsed_lines.append((epoch_sec, message))
if timezone_offset is None and message.sentence_type == "RMC":
if hasattr(message, "is_valid") and message.is_valid:
timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
if timezone_offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from RMC (%s %s)",
timezone_offset,
message.datestamp,
message.timestamp,
)
# Track first valid GGA/GLL for fallback
if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
if hasattr(message, "is_valid") and message.is_valid:
first_valid_gga_gll = (epoch_sec, message)

# Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
if timezone_offset is None and first_valid_gga_gll is not None:
epoch_sec, message = first_valid_gga_gll
timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
if timezone_offset is not None:
LOG.debug(
"Computed timezone offset %.1fs from %s (fallback, no date info)",
timezone_offset,
message.sentence_type,
)

# If no offset could be determined, use 0 (camera clock assumed correct)
if timezone_offset is None:
timezone_offset = 0.0

points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}

# Second pass: apply offset to all GPS points
for epoch_sec, message in parsed_lines:
corrected_epoch = round(epoch_sec + timezone_offset, 3)

# https://tavotech.com/gps-nmea-sentence-structure/
if message.sentence_type in ["GGA"]:
if not message.is_valid:
continue
point = telemetry.GPSPoint(
time=epoch_ms / 1000,
time=corrected_epoch,
lat=message.latitude,
lon=message.longitude,
alt=message.altitude,
angle=None,
epoch_time=epoch_ms / 1000,
epoch_time=corrected_epoch,
fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None,
precision=None,
ground_speed=None,
Expand All @@ -184,12 +288,12 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
if not message.is_valid:
continue
point = telemetry.GPSPoint(
time=epoch_ms / 1000,
time=corrected_epoch,
lat=message.latitude,
lon=message.longitude,
alt=None,
angle=None,
epoch_time=epoch_ms / 1000,
epoch_time=corrected_epoch,
fix=None,
precision=None,
ground_speed=None,
Expand Down
150 changes: 143 additions & 7 deletions tests/unit/test_blackvue_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def test_parse_points():

[1623057130258]$GPGGA,201205.00,3853.16949,N,07659.54604,W,2,10,0.82,7.7,M,-34.7,M,,0000*6F

[1623057132256]$GPRMC,201208.00,A,3853.16949,N,07659.54604,W,5.849,284.43,070621,,,D*7B

# invalid line
[1623057130221]$GPGGA,**&^%$%$&(&(*(&&(^^*^*^^*&^&*))))

Expand All @@ -52,27 +54,38 @@ def test_parse_points():
lon=-76.992434,
alt=None,
angle=None,
epoch_time=1623057129.256,
epoch_time=1623096725,
fix=None,
precision=None,
ground_speed=None,
),
telemetry.GPSPoint(
time=3.0,
lat=38.88615816666667,
lon=-76.992434,
alt=None,
angle=None,
epoch_time=1623096728,
fix=None,
precision=None,
ground_speed=None,
)
),
],
make="BlackVue",
model="",
)


def test_gpspoint_gga():
gps_data = b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"
gps_data = b"[1623057074211]$GPGGA,202530.25,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*66"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
assert point.time == 1623057074.211
assert point.time == 1623097530.25
assert point.lat == 51.150436666666664
assert point.lon == -114.03067833333333
assert point.epoch_time == 1623057074.211
assert point.epoch_time == 1623097530.25
assert point.fix == telemetry.GPSFix.FIX_3D


Expand All @@ -82,7 +95,130 @@ def test_gpspoint_gll():

assert len(points) == 1
point = points[0]
assert point.time == 1629874404.069
assert point.time == 1629850237
assert point.lat == 44.069002
assert point.lon == -121.31433216666667
assert point.epoch_time == 1629874404.069
assert point.epoch_time == 1629850237


def test_timezone_offset_from_rmc():
"""
Test timezone correction when camera clock is in local time (GMT-3).

Camera epoch: 1637762688000 ms = 2021-11-24 14:04:48 (local time)
RMC GPS time: 11:04:48 on 2021-11-24 UTC (correct UTC from satellites)
Expected offset: 3 hours = 10800 seconds
Corrected epoch: 1637762688 - 10800 = 1637751888 = 2021-11-24 11:04:48 UTC
"""
# Camera shows 14:04:48 local, GPS shows 11:04:48 UTC -> 3 hour offset (GMT+3)
gps_data = b"[1637762688000]$GPRMC,110448.65,A,3853.16949,N,07659.54604,W,5.849,284.43,241121,,,D*7E"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
expected_epoch = 1637751888.65
assert point.time == expected_epoch
assert point.epoch_time == expected_epoch


def test_timezone_offset_applied_to_all_points():
"""
Test that the same timezone offset is applied to all GPS points.
"""
# Two RMC messages with 1 second apart in camera time
# Camera is 3 hours ahead (GMT+3), GPS shows correct UTC
gps_data = b"""
[1637762688000]$GPRMC,110448.00,A,3853.16949,N,07659.54604,W,5.849,284.43,241121,,,D*7D

[1637762689000]$GPRMC,110449.00,A,3853.16950,N,07659.54605,W,5.850,284.44,241121,,,D*7A
"""
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 2
# Both points should have 10800s subtracted
assert points[0].time == 1637751888
assert points[1].time == 1637751889
# Time difference between points should be preserved
assert points[1].time - points[0].time == 1.0


def test_timezone_offset_zero_when_clock_correct():
"""
Test that no offset is applied when camera clock matches GPS time.
"""
# Camera epoch matches GPS time
# epoch 1623057125000 = 2021-06-07 09:12:05 UTC
gps_data = b"[1623057125000]$GPRMC,091205.00,A,3853.16949,N,07659.54604,W,5.849,284.43,070621,,,D*7D"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
assert point.time == 1623057125.0
assert point.epoch_time == 1623057125.0


def test_timezone_offset_fallback_gga():
"""
Test timezone correction fallback using GGA when no RMC is available.

Uses time-only correction with day boundary handling.
"""
# Camera shows 14:04:48 local, GGA shows 11:04:48 UTC -> 3 hour offset
# No RMC message, so fallback to GGA
gps_data = b"[1637762688000]$GPGGA,110448.00,3853.16949,N,07659.54604,W,2,10,0.82,7.7,M,-34.7,M,,0000*63"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
point = points[0]
expected_epoch = 1637751888
assert point.time == expected_epoch
assert point.epoch_time == expected_epoch


def test_gga_day_boundary_nmea_next_day():
"""
Test GGA day boundary: NMEA time is on the next day relative to camera.

Scenario: Camera is in a negative timezone (e.g., GMT-2).
Camera epoch: 1637794800 = 2021-11-24 23:00:00 (local time)
GGA GPS time: 01:00:00 (correct UTC, next day = 2021-11-25 01:00:00)
"""
gps_data = b"[1637794800000]$GPGGA,010000.00,3853.16949,N,07659.54604,W,2,10,0.82,7.7,M,-34.7,M,,0000*6A"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
expected_epoch = 1637802000
assert points[0].time == expected_epoch


def test_gga_day_boundary_nmea_previous_day():
"""
Test GGA day boundary: NMEA time is on the previous day relative to camera.

Scenario: Camera is in a positive timezone (e.g., GMT+2).
Camera epoch: 1637802000 = 2021-11-25 01:00:00 (local time)
GGA GPS time: 23:00:00 (correct UTC, prev day = 2021-11-24 23:00:00)
"""
gps_data = b"[1637802000000]$GPGGA,230000.00,3853.16949,N,07659.54604,W,2,10,0.82,7.7,M,-34.7,M,,0000*6A"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
expected_epoch = 1637794800
assert points[0].time == expected_epoch


def test_gga_no_day_boundary_within_12_hours():
"""
Test GGA with time difference within 12 hours - no day boundary adjustment.

Camera is in GMT-3. Camera shows 14:00 local, actual UTC is 11:00 (same day).
"""
# epoch 1637751600 = 2021-11-24 11:00:00 UTC
# But camera clock shows 14:00 local, stored as epoch for 14:00 UTC = 1637762400
gps_data = b"[1637762400000]$GPGGA,110000.00,3853.16949,N,07659.54604,W,2,10,0.82,7.7,M,-34.7,M,,0000*6B"
points = blackvue_parser._parse_gps_box(gps_data)

assert len(points) == 1
# 3 hour offset between 11:00 and 14:00 -> no day adjustment
expected_epoch = 1637762400.0 - 10800.0
assert points[0].time == expected_epoch