Skip to content

Commit

Permalink
feat: add IMU data in CAMM as experimental feature (#694)
Browse files Browse the repository at this point in the history
* update the interface

* write the streams to CAMM

* extract

* sort imports

* add MAPILLARY__EXPERIMENTAL_ENABLE_IMU

* parse IMU from camm

* introduce TelemetryMeasurement as baseclass

* print IMU data

* print IMU for comparison

* format

* rename

* relative imports

* fix tests
  • Loading branch information
ptpt authored Jan 17, 2025
1 parent ced3aa6 commit 37930a6
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 144 deletions.
111 changes: 87 additions & 24 deletions mapillary_tools/camm/camm_builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
import typing as T

from .. import geo, types
from .. import geo, telemetry, types
from ..mp4 import (
construct_mp4_parser as cparser,
mp4_sample_parser as sample_parser,
Expand All @@ -11,20 +11,65 @@
from . import camm_parser


def build_camm_sample(point: geo.Point) -> bytes:
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MIN_GPS.value,
"data": [
point.lat,
point.lon,
-1.0 if point.alt is None else point.alt,
],
}
)
TelemetryMeasurement = T.Union[
geo.Point,
telemetry.TelemetryMeasurement,
]


def _create_edit_list(
def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes:
if isinstance(measurement, geo.Point):
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MIN_GPS.value,
"data": [
measurement.lat,
measurement.lon,
-1.0 if measurement.alt is None else measurement.alt,
],
}
)
elif isinstance(measurement, telemetry.AccelerationData):
# Accelerometer reading in meters/second^2 along XYZ axes of the camera.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.ACCELERATION.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
elif isinstance(measurement, telemetry.GyroscopeData):
# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.GYRO.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
elif isinstance(measurement, telemetry.MagnetometerData):
# Ambient magnetic field.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MAGNETIC_FIELD.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
else:
raise ValueError(f"unexpected measurement type {type(measurement)}")


def _create_edit_list_from_points(
point_segments: T.Sequence[T.Sequence[geo.Point]],
movie_timescale: int,
media_timescale: int,
Expand Down Expand Up @@ -82,18 +127,30 @@ def _create_edit_list(
}


def convert_points_to_raw_samples(
points: T.Sequence[geo.Point], timescale: int
def _multiplex(
points: T.Sequence[geo.Point],
measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None,
) -> T.List[TelemetryMeasurement]:
mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])]
mutiplexed.sort(key=lambda m: m.time)

return mutiplexed


def convert_telemetry_to_raw_samples(
measurements: T.Sequence[TelemetryMeasurement],
timescale: int,
) -> T.Generator[sample_parser.RawSample, None, None]:
for idx, point in enumerate(points):
camm_sample_data = build_camm_sample(point)
for idx, measurement in enumerate(measurements):
camm_sample_data = _build_camm_sample(measurement)

if idx + 1 < len(points):
timedelta = int((points[idx + 1].time - point.time) * timescale)
if idx + 1 < len(measurements):
timedelta = int((measurements[idx + 1].time - measurement.time) * timescale)
else:
timedelta = 0

assert 0 <= timedelta <= builder.UINT32_MAX, (
f"expected timedelta {timedelta} between {points[idx]} and {points[idx + 1]} with timescale {timescale} to be <= UINT32_MAX"
f"expected timedelta {timedelta} between {measurements[idx]} and {measurements[idx + 1]} with timescale {timescale} to be <= UINT32_MAX"
)

yield sample_parser.RawSample(
Expand Down Expand Up @@ -232,19 +289,23 @@ def create_camm_trak(
}


def camm_sample_generator2(video_metadata: types.VideoMetadata):
def camm_sample_generator2(
video_metadata: types.VideoMetadata,
telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None,
):
def _f(
fp: T.BinaryIO,
moov_children: T.List[builder.BoxDict],
) -> T.Generator[io.IOBase, None, None]:
movie_timescale = builder.find_movie_timescale(moov_children)
# make sure the precision of timedeltas not lower than 0.001 (1ms)
media_timescale = max(1000, movie_timescale)
measurements = _multiplex(video_metadata.points, telemetry_measurements)
camm_samples = list(
convert_points_to_raw_samples(video_metadata.points, media_timescale)
convert_telemetry_to_raw_samples(measurements, media_timescale)
)
camm_trak = create_camm_trak(camm_samples, media_timescale)
elst = _create_edit_list(
elst = _create_edit_list_from_points(
[video_metadata.points], movie_timescale, media_timescale
)
if T.cast(T.Dict, elst["data"])["entries"]:
Expand Down Expand Up @@ -280,6 +341,8 @@ def _f(
)

# if yield, the moov_children will not be modified
return (io.BytesIO(build_camm_sample(point)) for point in video_metadata.points)
return (
io.BytesIO(_build_camm_sample(measurement)) for measurement in measurements
)

return _f
159 changes: 117 additions & 42 deletions mapillary_tools/camm/camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@

import construct as C

from .. import geo
from ..mp4 import mp4_sample_parser as sample_parser, simple_mp4_parser as sparser
from .. import geo, telemetry
from ..mp4 import simple_mp4_parser as sparser
from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser


LOG = logging.getLogger(__name__)


TelemetryMeasurement = T.Union[
geo.Point,
telemetry.AccelerationData,
telemetry.GyroscopeData,
telemetry.MagnetometerData,
]


# Camera Motion Metadata Spec https://developers.google.com/streetview/publish/camm-spec
class CAMMType(Enum):
ANGLE_AXIS = 0
Expand Down Expand Up @@ -75,9 +84,9 @@ class CAMMType(Enum):
)


def _parse_point_from_sample(
fp: T.BinaryIO, sample: sample_parser.Sample
) -> T.Optional[geo.Point]:
def _parse_telemetry_from_sample(
fp: T.BinaryIO, sample: Sample
) -> T.Optional[TelemetryMeasurement]:
fp.seek(sample.raw_sample.offset, io.SEEK_SET)
data = fp.read(sample.raw_sample.size)
box = CAMMSampleData.parse(data)
Expand All @@ -99,12 +108,34 @@ def _parse_point_from_sample(
alt=box.data.altitude,
angle=None,
)
elif box.type == CAMMType.ACCELERATION.value:
return telemetry.AccelerationData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
elif box.type == CAMMType.GYRO.value:
return telemetry.GyroscopeData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
elif box.type == CAMMType.MAGNETIC_FIELD.value:
return telemetry.MagnetometerData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
return None


def filter_points_by_elst(
points: T.Iterable[geo.Point], elst: T.Sequence[T.Tuple[float, float]]
) -> T.Generator[geo.Point, None, None]:
def _filter_telemetry_by_elst_segments(
measurements: T.Iterable[TelemetryMeasurement],
elst: T.Sequence[T.Tuple[float, float]],
) -> T.Generator[TelemetryMeasurement, None, None]:
empty_elst = [entry for entry in elst if entry[0] == -1]
if empty_elst:
offset = empty_elst[-1][1]
Expand All @@ -114,20 +145,26 @@ def filter_points_by_elst(
elst = [entry for entry in elst if entry[0] != -1]

if not elst:
for p in points:
yield dataclasses.replace(p, time=p.time + offset)
for m in measurements:
if dataclasses.is_dataclass(m):
yield dataclasses.replace(m, time=m.time + offset)
else:
m._replace(time=m.time + offset)
return

elst.sort(key=lambda entry: entry[0])
elst_idx = 0
for p in points:
for m in measurements:
if len(elst) <= elst_idx:
break
media_time, duration = elst[elst_idx]
if p.time < media_time:
if m.time < media_time:
pass
elif p.time <= media_time + duration:
yield dataclasses.replace(p, time=p.time + offset)
elif m.time <= media_time + duration:
if dataclasses.is_dataclass(m):
yield dataclasses.replace(m, time=m.time + offset)
else:
m._replace(time=m.time + offset)
else:
elst_idx += 1

Expand All @@ -148,46 +185,84 @@ def _is_camm_description(description: T.Dict) -> bool:
return description["format"] == b"camm"


def _contains_camm_description(track: TrackBoxParser) -> bool:
descriptions = track.extract_sample_descriptions()
return any(_is_camm_description(d) for d in descriptions)


def _filter_telemetry_by_track_elst(
moov: MovieBoxParser,
track: TrackBoxParser,
measurements: T.Iterable[TelemetryMeasurement],
) -> T.List[TelemetryMeasurement]:
elst_boxdata = track.extract_elst_boxdata()

if elst_boxdata is not None:
elst_entries = elst_boxdata["entries"]
if elst_entries:
# media_timescale
mdhd_boxdata = track.extract_mdhd_boxdata()
media_timescale = mdhd_boxdata["timescale"]

# movie_timescale
mvhd_boxdata = moov.extract_mvhd_boxdata()
movie_timescale = mvhd_boxdata["timescale"]

segments = [
elst_entry_to_seconds(
entry,
movie_timescale=movie_timescale,
media_timescale=media_timescale,
)
for entry in elst_entries
]

return list(_filter_telemetry_by_elst_segments(measurements, segments))

return list(measurements)


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
"""
Return a list of points (could be empty) if it is a valid CAMM video,
otherwise None
"""

points = None
moov = MovieBoxParser.parse_stream(fp)

moov = sample_parser.MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_camm_description(d) for d in descriptions):
maybe_points = (
_parse_point_from_sample(fp, sample)
if _contains_camm_description(track):
maybe_measurements = (
_parse_telemetry_from_sample(fp, sample)
for sample in track.extract_samples()
if _is_camm_description(sample.description)
)
points = [p for p in maybe_points if p is not None]
if points:
elst_boxdata = track.extract_elst_boxdata()
if elst_boxdata is not None:
elst_entries = elst_boxdata["entries"]
if elst_entries:
# media_timescale
mdhd_boxdata = track.extract_mdhd_boxdata()
media_timescale = mdhd_boxdata["timescale"]
# movie_timescale
mvhd_boxdata = moov.extract_mvhd_boxdata()
movie_timescale = mvhd_boxdata["timescale"]
segments = [
elst_entry_to_seconds(
entry,
movie_timescale=movie_timescale,
media_timescale=media_timescale,
)
for entry in elst_entries
]
points = list(filter_points_by_elst(points, segments))
points = [m for m in maybe_measurements if isinstance(m, geo.Point)]

return points
return T.cast(
T.List[geo.Point], _filter_telemetry_by_track_elst(moov, track, points)
)

return None


def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[T.List[TelemetryMeasurement]]:
moov = MovieBoxParser.parse_stream(fp)

for track in moov.extract_tracks():
if _contains_camm_description(track):
maybe_measurements = (
_parse_telemetry_from_sample(fp, sample)
for sample in track.extract_samples()
if _is_camm_description(sample.description)
)
measurements = [m for m in maybe_measurements if m is not None]

measurements = _filter_telemetry_by_track_elst(moov, track, measurements)

return measurements

return None


def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]:
Expand Down
Loading

0 comments on commit 37930a6

Please sign in to comment.