-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: Add section on using denormalized for generating realtime featu…
…res (#4697)
- Loading branch information
Showing
3 changed files
with
113 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# Streaming feature computation with Denormalized | ||
|
||
Denormalized makes it easy to compute real-time features and write them directly to your Feast feature store. This guide will walk you through setting up a streaming pipeline that computes feature aggregations and pushes them to Feast in real-time. | ||
|
||
![Denormalized/Feast integration diagram](../assets/feast-denormalized.png) | ||
|
||
## Prerequisites | ||
|
||
- Python 3.8+ | ||
- Kafka cluster (local or remote) | ||
|
||
For a full working demo, check out the [feast-example](https://github.com/probably-nothing-labs/feast-example) repo. | ||
|
||
## Quick Start | ||
|
||
1. First, create a new Python project or use our template: | ||
```bash | ||
mkdir my-feature-project | ||
cd my-feature-project | ||
python -m venv .venv | ||
source .venv/bin/activate # or `.venv\Scripts\activate` on Windows | ||
pip install denormalized[feast] feast | ||
``` | ||
|
||
2. Set up your Feast feature repository: | ||
```bash | ||
feast init feature_repo | ||
``` | ||
|
||
## Project Structure | ||
|
||
Your project should look something like this: | ||
``` | ||
my-feature-project/ | ||
├── feature_repo/ | ||
│ ├── feature_store.yaml | ||
│ └── sensor_data.py # Feature definitions | ||
├── stream_job.py # Denormalized pipeline | ||
└── main.py # Pipeline runner | ||
``` | ||
|
||
## Define Your Features | ||
|
||
In `feature_repo/sensor_data.py`, define your feature view and entity: | ||
|
||
```python | ||
from feast import Entity, FeatureView, PushSource, Field | ||
from feast.types import Float64, String | ||
|
||
# Define your entity | ||
sensor = Entity( | ||
name="sensor", | ||
join_keys=["sensor_name"], | ||
) | ||
|
||
# Create a push source for real-time features | ||
source = PushSource( | ||
name="push_sensor_statistics", | ||
batch_source=your_batch_source # Define your batch source | ||
) | ||
|
||
# Define your feature view | ||
stats_view = FeatureView( | ||
name="sensor_statistics", | ||
entities=[sensor], | ||
schema=ds.get_feast_schema(), # Denormalized handles this for you! | ||
source=source, | ||
online=True, | ||
) | ||
``` | ||
|
||
## Create Your Streaming Pipeline | ||
|
||
In `stream_job.py`, define your streaming computations: | ||
|
||
```python | ||
from denormalized import Context, FeastDataStream | ||
from denormalized.datafusion import col, functions as f | ||
from feast import FeatureStore | ||
|
||
sample_event = { | ||
"occurred_at_ms": 100, | ||
"sensor_name": "foo", | ||
"reading": 0.0, | ||
} | ||
|
||
# Create a stream from your Kafka topic | ||
ds = FeastDataStream(Context().from_topic("temperature", json.dumps(sample_event), "localhost:9092")) | ||
|
||
# Define your feature computations | ||
ds = ds.window( | ||
[col("sensor_name")], # Group by sensor | ||
[ | ||
f.count(col("reading")).alias("count"), | ||
f.min(col("reading")).alias("min"), | ||
f.max(col("reading")).alias("max"), | ||
f.avg(col("reading")).alias("average"), | ||
], | ||
1000, # Window size in ms | ||
None # Slide interval (None = tumbling window) | ||
) | ||
|
||
feature_store = FeatureStore(repo_path="feature_repo/") | ||
|
||
# This single line connects Denormalized to Feast! | ||
ds.write_feast_feature(feature_store, "push_sensor_statistics") | ||
``` | ||
|
||
## Need Help? | ||
|
||
- Email us at [email protected] | ||
- Check out more examples on our [GitHub](https://github.com/probably-nothing-labs/denormalized) |