Skip to content

Commit

Permalink
Merge pull request #53 from tryolabs/custom-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
dekked authored May 31, 2021
2 parents ee72ae2 + f5b512a commit b409c79
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 38 deletions.
28 changes: 28 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The class in charge of performing the tracking of the detections produced by the
- `initialization_delay (optional)`: Each tracked object waits till its internal hit intertia counter goes over `hit_inertia_min` to be considered as a potential object to be returned to the user by the Tracker. The argument `initialization_delay` determines by how much the object's hit inertia counter must exceed `hit_inertia_min` to be considered as initialized and get returned to the user as a real object. Defaults to `(hit_inertia_min + hit_inertia_max) / 2`.
- `detection_threshold (optional)`: Sets the threshold at which the scores of the points in a detection being fed into the tracker must dip below to be ignored by the tracker. Defaults to `0`.
- `point_transience (optional)`: Each tracked object keeps track of how much often of the points its tracking has been getting matched. Points that are getting matches are said to be live, and points which aren't are said to not be live. This determines things like which points in a tracked object get drawn by [`draw_tracked_objects`](#draw_tracked_objects) and which don't. This argument determines how short lived points not getting matched are. Defaults to `4`.
- `filter_setup (optional)`: This parameter can be used to change the parameters of the Kalman Filter that is used by [`TrackedObject`](#trackedobject) instances. Defaults to [`FilterSetup()`](#filtersetup).

### Tracker.update

Expand All @@ -37,6 +38,33 @@ Detections returned by the detector must be converted to a `Detection` object be
- `scores`: An array of length `number of points per object` which assigns a score to each of the points defined in `points`. This is used to inform the tracker of which points to ignore; any point with a score below `detection_threshold` will be ignored. This useful for cases in which detections don't always have every point detected, as is often the case in pose estimators.
- `data`: The place to store any extra data which may be useful when calculating the distance function. Anything stored here will be available to use inside the distance function. This enables the development of more interesting trackers which can do things like assign an appearance embedding to each detection to aid in its tracking.

## FilterSetup

This class can be used either to change some parameters of the [KalmanFilter](https://filterpy.readthedocs.io/en/latest/kalman/KalmanFilter.html) that the tracker uses, or to fully customize the predictive filter implementation to use (as long as the methods and properties are compatible).
The former case only requires changing the default parameters upon tracker creation: `tracker = Tracker(..., filter_setup=FilterSetup(R=100))`, while the latter requires creating your own class extending `FilterSetup`, and rewriting its `create_filter` method to return your own customized filter.


##### Arguments:

Note that these arguments correspond to the same parameters of the [`filterpy.KalmanFilter` (see docs)](https://filterpy.readthedocs.io/en/latest/kalman/KalmanFilter.html) that this class returns.
- `R`: Multiplier for the sensor measurement noise matrix. Defaults to `4.0`.
- `Q`: Multiplier for the process uncertainty. Defaults to `0.1`.
- `P`: Multiplier for the initial covariance matrix estimation, only in the entries that correspond to position (not speed) variables. Defaults to `10.0`.


### FilterSetup.create_filter

This function returns a new predictive filter instance with the current setup, to be used by each new [`TrackedObject`](#trackedobject) that is created. This predictive filter will be used to estimate speed and future positions of the object, to better match the detections during its trajectory.

This method may be overwritten by a subclass of `FilterSetup`, in case that further customizations of the filter parameters or implementation are needed.

##### Arguments:

- `initial_detection`: numpy array of shape `(number of points per object, 2)`, corresponding to the [`Detection.points`](#detection) of the tracked object being born, which shall be used as initial position estimation for it.

##### Returns:
A new `filterpy.KalmanFilter` instance (or an API compatible object, since the class is not restricted by type checking).

## TrackedObject

The objects returned by the tracker's `update` function on each iteration. They represent the objects currently being tracked by the tracker.
Expand Down
1 change: 1 addition & 0 deletions norfair/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .drawing import *
from .tracker import Detection, Tracker
from .filter import FilterSetup
from .utils import get_cutout, print_objects_as_table
from .video import Video
40 changes: 40 additions & 0 deletions norfair/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy as np
from filterpy.kalman import KalmanFilter


class FilterSetup:
def __init__(self, R: float = 4.0, Q: float = 0.1, P: float = 10.0):
self.R = R
self.Q = Q
self.P = P

def create_filter(self, initial_detection: np.array):
num_points = initial_detection.shape[0]
dim_z = 2 * num_points
dim_x = 2 * 2 * num_points # We need to accommodate for velocities

filter = KalmanFilter(dim_x=dim_x, dim_z=dim_z)

# State transition matrix (models physics): numpy.array()
filter.F = np.eye(dim_x)
dt = 1 # At each step we update pos with v * dt

filter.F[:dim_z, dim_z:] = dt * np.eye(dim_z)

# Measurement function: numpy.array(dim_z, dim_x)
filter.H = np.eye(dim_z, dim_x,)

# Measurement uncertainty (sensor noise): numpy.array(dim_z, dim_z)
filter.R *= self.R

# Process uncertainty: numpy.array(dim_x, dim_x)
# Don't decrease it too much or trackers pay too little attention to detections
filter.Q[dim_z:, dim_z:] *= self.Q

# Initial state: numpy.array(dim_x, 1)
filter.x[:dim_z] = np.expand_dims(initial_detection.flatten(), 0).T

# Estimation uncertainty: numpy.array(dim_x, dim_x)
filter.P[dim_z:, dim_z:] *= self.P

return filter
48 changes: 10 additions & 38 deletions norfair/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from typing import Callable, List, Optional, Sequence

import numpy as np
from filterpy.kalman import KalmanFilter
from rich import print

from .utils import validate_points
from .filter import FilterSetup


class Tracker:
Expand All @@ -18,11 +18,13 @@ def __init__(
initialization_delay: Optional[int] = None,
detection_threshold: float = 0,
point_transience: int = 4,
filter_setup: "FilterSetup" = FilterSetup(),
):
self.tracked_objects: Sequence["TrackedObject"] = []
self.distance_function = distance_function
self.hit_inertia_min = hit_inertia_min
self.hit_inertia_max = hit_inertia_max
self.filter_setup = filter_setup

if initialization_delay is None:
self.initialization_delay = int(
Expand Down Expand Up @@ -74,6 +76,7 @@ def update(self, detections: Optional[List["Detection"]] = None, period: int = 1
self.detection_threshold,
self.period,
self.point_transience,
self.filter_setup,
)
)

Expand Down Expand Up @@ -197,14 +200,16 @@ def __init__(
detection_threshold: float,
period: int,
point_transience: int,
filter_setup: "FilterSetup",
):
try:
self.num_points = validate_points(initial_detection.points).shape[0]
initial_detection_points = validate_points(initial_detection.points)
except AttributeError:
print(
f"\n[red]ERROR[/red]: The detection list fed into `tracker.update()` should be composed of {Detection} objects not {type(initial_detection)}.\n"
)
exit()
self.num_points = initial_detection_points.shape[0]
self.hit_inertia_min: int = hit_inertia_min
self.hit_inertia_max: int = hit_inertia_max
self.initialization_delay = initialization_delay
Expand All @@ -228,44 +233,11 @@ def __init__(
TrackedObject.initializing_count
) # Just for debugging
TrackedObject.initializing_count += 1
self.setup_filter(initial_detection.points)
self.detected_at_least_once_points = np.array([False] * self.num_points)

def setup_filter(self, initial_detection: np.array):
initial_detection = validate_points(initial_detection)

dim_x = 2 * 2 * self.num_points # We need to accomodate for velocities
dim_z = 2 * self.num_points
self.dim_z = dim_z
self.filter = KalmanFilter(dim_x=dim_x, dim_z=dim_z)

# State transition matrix (models physics): numpy.array()
self.filter.F = np.eye(dim_x)
dt = 1 # At each step we update pos with v * dt
for p in range(dim_z):
self.filter.F[p, p + dim_z] = dt

# Measurement function: numpy.array(dim_z, dim_x)
self.filter.H = np.eye(
dim_z,
dim_x,
)

# Measurement uncertainty (sensor noise): numpy.array(dim_z, dim_z)
# TODO: maybe we should open this one to the users, as it lets them
# chose between giving more/less importance to the detections
self.filter.R *= 4.0

# Process uncertainty: numpy.array(dim_x, dim_x)
# Don't decrease it too much or trackers pay too little attention to detections
# self.filter.Q[:dim_z, :dim_z] /= 50
self.filter.Q[dim_z:, dim_z:] /= 10

# Initial state: numpy.array(dim_x, 1)
self.filter.x[:dim_z] = np.expand_dims(initial_detection.flatten(), 0).T

# Estimation uncertainty: numpy.array(dim_x, dim_x)
self.filter.P[dim_z:, dim_z:] *= 10.0
# Create Kalman Filter
self.filter = filter_setup.create_filter(initial_detection_points)
self.dim_z = 2 * self.num_points

def tracker_step(self):
self.hit_counter -= 1
Expand Down

0 comments on commit b409c79

Please sign in to comment.