Skip to content

Commit

Permalink
feat(platform): Add multimedia file support & add basic Video blocks (S…
Browse files Browse the repository at this point in the history
…ignificant-Gravitas#9320)

Currently, there is no support for passing files in the platform, each
generated file should be hosted somewhere.
This PR adds support of passing files temporarily during the execution
to open up more block that does multimedia operations.

<img width="583" alt="image"
src="https://github.com/user-attachments/assets/c285de5a-c2a9-41a0-9be1-305a316879d6"
/>

<img width="1291" alt="image"
src="https://github.com/user-attachments/assets/d7bcaf38-80fa-4b51-91da-b4eed80a02c1"
/>


### Changes 🏗️

* Add media support for passing files (local files, base64, URL) and
`FileStoreBlock` (file version of `StoreValueBlock`)
* Add initial multimedia blocks: `LoopVideoBlock` &
`AddAudioToVideoBlock`.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

---------

Co-authored-by: Nicholas Tindle <[email protected]>
  • Loading branch information
2 people authored and waterstark committed Jan 30, 2025
1 parent d82c9d3 commit 9097325
Show file tree
Hide file tree
Showing 13 changed files with 1,697 additions and 914 deletions.
37 changes: 37 additions & 0 deletions autogpt_platform/backend/backend/blocks/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,50 @@

from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import SchemaField
from backend.util.file import MediaFile, store_media_file
from backend.util.mock import MockObject
from backend.util.text import TextFormatter
from backend.util.type import convert

formatter = TextFormatter()


class FileStoreBlock(Block):
class Input(BlockSchema):
file_in: MediaFile = SchemaField(
description="The file to store in the temporary directory, it can be a URL, data URI, or local path."
)

class Output(BlockSchema):
file_out: MediaFile = SchemaField(
description="The relative path to the stored file in the temporary directory."
)

def __init__(self):
super().__init__(
id="cbb50872-625b-42f0-8203-a2ae78242d8a",
description="Stores the input file in the temporary directory.",
categories={BlockCategory.BASIC, BlockCategory.MULTIMEDIA},
input_schema=FileStoreBlock.Input,
output_schema=FileStoreBlock.Output,
static_output=True,
)

def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
file_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.file_in,
return_content=False,
)
yield "file_out", file_path


class StoreValueBlock(Block):
"""
This block allows you to provide a constant value as a block, in a stateless manner.
Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/backend/blocks/ideogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(self):
super().__init__(
id="6ab085e2-20b3-4055-bc3e-08036e01eca6",
description="This block runs Ideogram models with both simple and advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=IdeogramModelBlock.Input,
output_schema=IdeogramModelBlock.Output,
test_input={
Expand Down
245 changes: 245 additions & 0 deletions autogpt_platform/backend/backend/blocks/media.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import os
import tempfile
from typing import Literal, Optional

from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip

from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import MediaFile, get_exec_file_path, store_media_file


class MediaDurationBlock(Block):

class Input(BlockSchema):
media_in: MediaFile = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)

class Output(BlockSchema):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)

def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)

def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)

# 2) Load the clip
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)

yield "duration", clip.duration


class LoopVideoBlock(Block):
"""
Block for looping (repeating) a video clip until a given duration or number of loops.
"""

class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
# Provide EITHER a `duration` or `n_loops` or both. We'll demonstrate `duration`.
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)

class Output(BlockSchema):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)

def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)

def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)

# 2) Load the clip
clip = VideoFileClip(input_abspath)

# 3) Apply the loop effect
looped_clip = clip
if input_data.duration:
# Loop until we reach the specified duration
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")

assert isinstance(looped_clip, VideoFileClip)

# 4) Save the looped output
output_filename = MediaFile(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)

looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")

# Return as data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)

yield "video_out", video_out


class AddAudioToVideoBlock(Block):
"""
Block that adds (attaches) an audio track to an existing video.
Optionally scale the volume of the new track.
"""

class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFile = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)

class Output(BlockSchema):
video_out: MediaFile = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)

def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)

def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
local_audio_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
return_content=False,
)

abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)

# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
# Optionally scale volume
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)

# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)

# 4) Write to output file
output_filename = MediaFile(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")

# 5) Return either path or data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)

yield "video_out", video_out
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(self):
super().__init__(
id="90f8c45e-e983-4644-aa0b-b4ebe2f531bc",
description="This block runs Flux models on Replicate with advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=ReplicateFluxAdvancedModelBlock.Input,
output_schema=ReplicateFluxAdvancedModelBlock.Output,
test_input={
Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/backend/blocks/talking_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self):
super().__init__(
id="98c6f503-8c47-4b1c-a96d-351fc7c87dab",
description="This block integrates with D-ID to create video clips and retrieve their URLs.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=CreateTalkingAvatarVideoBlock.Input,
output_schema=CreateTalkingAvatarVideoBlock.Output,
test_input={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self):
super().__init__(
id="4ff1ff6d-cc40-4caa-ae69-011daa20c378",
description="Converts text to speech using the Unreal Speech API",
categories={BlockCategory.AI, BlockCategory.TEXT},
categories={BlockCategory.AI, BlockCategory.TEXT, BlockCategory.MULTIMEDIA},
input_schema=UnrealTextToSpeechBlock.Input,
output_schema=UnrealTextToSpeechBlock.Output,
test_input={
Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/backend/backend/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class BlockCategory(Enum):
)
PRODUCTIVITY = "Block that helps with productivity"
ISSUE_TRACKING = "Block that helps with issue tracking"
MULTIMEDIA = "Block that interacts with multimedia content"

def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}
Expand Down
9 changes: 8 additions & 1 deletion autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util import json
from backend.util.decorator import error_logged, time_measured
from backend.util.file import clean_exec_files
from backend.util.logging import configure_logging
from backend.util.process import set_service_name
from backend.util.service import (
Expand Down Expand Up @@ -169,7 +170,12 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
log_metadata.info("Executed node with input", input=input_data_str)
update_execution(ExecutionStatus.RUNNING)

extra_exec_kwargs = {}
extra_exec_kwargs: dict = {
"graph_id": graph_id,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"node_exec_id": node_exec_id,
}
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
Expand Down Expand Up @@ -729,6 +735,7 @@ def callback(result: object):
finished = True
cancel.set()
cancel_thread.join()
clean_exec_files(graph_exec.graph_exec_id)

return (
exec_stats,
Expand Down
Loading

0 comments on commit 9097325

Please sign in to comment.