diff --git a/src/backend/base/langflow/components/tools/__init__.py b/src/backend/base/langflow/components/tools/__init__.py index 8be4d88331f6..3255eaf333fe 100644 --- a/src/backend/base/langflow/components/tools/__init__.py +++ b/src/backend/base/langflow/components/tools/__init__.py @@ -28,7 +28,6 @@ from .wolfram_alpha_api import WolframAlphaAPIComponent from .yahoo import YfinanceComponent from .yahoo_finance import YfinanceToolComponent -from .youtube_transcripts import YouTubeTranscriptsComponent with warnings.catch_warnings(): warnings.simplefilter("ignore", LangChainDeprecationWarning) @@ -64,5 +63,4 @@ "WolframAlphaAPIComponent", "YfinanceComponent", "YfinanceToolComponent", - "YouTubeTranscriptsComponent", ] diff --git a/src/backend/base/langflow/components/tools/youtube_transcripts.py b/src/backend/base/langflow/components/tools/youtube_transcripts.py deleted file mode 100644 index 19250194bd16..000000000000 --- a/src/backend/base/langflow/components/tools/youtube_transcripts.py +++ /dev/null @@ -1,244 +0,0 @@ -from langchain_community.document_loaders import YoutubeLoader -from langchain_community.document_loaders.youtube import TranscriptFormat - -from langflow.custom import Component -from langflow.inputs import DropdownInput, IntInput, MultilineInput -from langflow.schema import Message -from langflow.template import Output - - -class YouTubeTranscriptsComponent(Component): - """A component that extracts spoken content from YouTube videos as transcripts.""" - - display_name: str = "YouTube Transcripts" - description: str = "Extracts spoken content from YouTube videos as transcripts." - icon: str = "YouTube" - name = "YouTubeTranscripts" - - inputs = [ - MultilineInput( - name="url", - display_name="Video URL", - info="Enter the YouTube video URL to get transcripts from.", - tool_mode=True, - required=True, - ), - DropdownInput( - name="transcript_format", - display_name="Transcript Format", - options=["text", "chunks"], - value="text", - info="The format of the transcripts. Either 'text' for a single output or 'chunks' for timestamped chunks.", - advanced=True, - ), - IntInput( - name="chunk_size_seconds", - display_name="Chunk Size (seconds)", - value=60, - advanced=True, - info="The size of each transcript chunk in seconds. Only applicable when " - "'Transcript Format' is set to 'chunks'.", - ), - DropdownInput( - name="language", - display_name="Language", - options=[ - "af", - "ak", - "sq", - "am", - "ar", - "hy", - "as", - "ay", - "az", - "bn", - "eu", - "be", - "bho", - "bs", - "bg", - "my", - "ca", - "ceb", - "zh", - "zh-HK", - "zh-CN", - "zh-SG", - "zh-TW", - "zh-Hans", - "zh-Hant", - "hak-TW", - "nan-TW", - "co", - "hr", - "cs", - "da", - "dv", - "nl", - "en", - "en-US", - "eo", - "et", - "ee", - "fil", - "fi", - "fr", - "gl", - "lg", - "ka", - "de", - "el", - "gn", - "gu", - "ht", - "ha", - "haw", - "iw", - "hi", - "hmn", - "hu", - "is", - "ig", - "id", - "ga", - "it", - "ja", - "jv", - "kn", - "kk", - "km", - "rw", - "ko", - "kri", - "ku", - "ky", - "lo", - "la", - "lv", - "ln", - "lt", - "lb", - "mk", - "mg", - "ms", - "ml", - "mt", - "mi", - "mr", - "mn", - "ne", - "nso", - "no", - "ny", - "or", - "om", - "ps", - "fa", - "pl", - "pt", - "pa", - "qu", - "ro", - "ru", - "sm", - "sa", - "gd", - "sr", - "sn", - "sd", - "si", - "sk", - "sl", - "so", - "st", - "es", - "su", - "sw", - "sv", - "tg", - "ta", - "tt", - "te", - "th", - "ti", - "ts", - "tr", - "tk", - "uk", - "ur", - "ug", - "uz", - "vi", - "cy", - "fy", - "xh", - "yi", - "yo", - "zu", - ], - value="en", - info=( - "Specify to make sure the transcripts are retrieved in your desired language. Defaults to English: 'en'" - ), - ), - DropdownInput( - name="translation", - display_name="Translation Language", - advanced=True, - options=["", "en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "hi", "ar", "id"], - info="Translate the transcripts to the specified language. Leave empty for no translation.", - ), - ] - - outputs = [ - Output(name="transcripts", display_name="Transcription", method="build_youtube_transcripts"), - ] - - def build_youtube_transcripts(self) -> Message: - """Method to extracts transcripts from a YouTube video URL. - - Returns: - Message: The transcripts of the video as a text string. If 'transcript_format' - is 'text', the transcripts are returned as a single continuous string. If - 'transcript_format' is 'chunks', the transcripts are returned as a string - with timestamped segments. - - Raises: - Exception: Returns an error message if transcript retrieval fails. - """ - try: - # Attempt to load transcripts in the specified language, fallback to any available language - languages = [self.language] if self.language else None - loader = YoutubeLoader.from_youtube_url( - self.url, - transcript_format=TranscriptFormat.TEXT - if self.transcript_format == "text" - else TranscriptFormat.CHUNKS, - chunk_size_seconds=self.chunk_size_seconds, - language=languages, - translation=self.translation or None, - ) - - transcripts = loader.load() - - if self.transcript_format == "text": - # Extract only the page_content from the Document - result = transcripts[0].page_content - return Message(text=result) - - # For chunks, format the output with timestamps - formatted_chunks = [] - for doc in transcripts: - start_seconds = int(doc.metadata["start_seconds"]) - start_minutes = start_seconds // 60 - start_seconds %= 60 - timestamp = f"{start_minutes:02d}:{start_seconds:02d}" - formatted_chunks.append(f"{timestamp} {doc.page_content}") - result = "\n".join(formatted_chunks) - return Message(text=result) - - except Exception as exc: # noqa: BLE001 - # Using a specific error type for the return value - error_msg = f"Failed to get YouTube transcripts: {exc!s}" - return Message(text=error_msg) diff --git a/src/backend/base/langflow/components/youtube/__init__.py b/src/backend/base/langflow/components/youtube/__init__.py new file mode 100644 index 000000000000..4c4ab6f3326c --- /dev/null +++ b/src/backend/base/langflow/components/youtube/__init__.py @@ -0,0 +1,17 @@ +from .channel import YouTubeChannelComponent +from .comments import YouTubeCommentsComponent +from .playlist import YouTubePlaylistComponent +from .search import YouTubeSearchComponent +from .trending import YouTubeTrendingComponent +from .video_details import YouTubeVideoDetailsComponent +from .youtube_transcripts import YouTubeTranscriptsComponent + +__all__ = [ + "YouTubeChannelComponent", + "YouTubeCommentsComponent", + "YouTubePlaylistComponent", + "YouTubeSearchComponent", + "YouTubeTranscriptsComponent", + "YouTubeTrendingComponent", + "YouTubeVideoDetailsComponent", +] diff --git a/src/backend/base/langflow/components/youtube/channel.py b/src/backend/base/langflow/components/youtube/channel.py new file mode 100644 index 000000000000..62a6e5bcd93a --- /dev/null +++ b/src/backend/base/langflow/components/youtube/channel.py @@ -0,0 +1,227 @@ +from typing import Any +from urllib.error import HTTPError + +import pandas as pd +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from langflow.custom import Component +from langflow.inputs import BoolInput, MessageTextInput, SecretStrInput +from langflow.schema import DataFrame +from langflow.template import Output + + +class YouTubeChannelComponent(Component): + """A component that retrieves detailed information about YouTube channels.""" + + display_name: str = "YouTube Channel" + description: str = "Retrieves detailed information and statistics about YouTube channels as a DataFrame." + icon: str = "YouTube" + + # Constants + CHANNEL_ID_LENGTH = 24 + QUOTA_EXCEEDED_STATUS = 403 + NOT_FOUND_STATUS = 404 + MAX_PLAYLIST_RESULTS = 10 + + inputs = [ + MessageTextInput( + name="channel_url", + display_name="Channel URL or ID", + info="The URL or ID of the YouTube channel.", + tool_mode=True, + required=True, + ), + SecretStrInput( + name="api_key", + display_name="YouTube API Key", + info="Your YouTube Data API key.", + required=True, + ), + BoolInput( + name="include_statistics", + display_name="Include Statistics", + value=True, + info="Include channel statistics (views, subscribers, videos).", + ), + BoolInput( + name="include_branding", + display_name="Include Branding", + value=True, + info="Include channel branding settings (banner, thumbnails).", + advanced=True, + ), + BoolInput( + name="include_playlists", + display_name="Include Playlists", + value=False, + info="Include channel's public playlists.", + advanced=True, + ), + ] + + outputs = [ + Output(name="channel_df", display_name="Channel Info", method="get_channel_info"), + ] + + def _extract_channel_id(self, channel_url: str) -> str: + """Extracts the channel ID from various YouTube channel URL formats.""" + import re + + if channel_url.startswith("UC") and len(channel_url) == self.CHANNEL_ID_LENGTH: + return channel_url + + patterns = { + "custom_url": r"youtube\.com\/c\/([^\/\n?]+)", + "channel_id": r"youtube\.com\/channel\/([^\/\n?]+)", + "user": r"youtube\.com\/user\/([^\/\n?]+)", + "handle": r"youtube\.com\/@([^\/\n?]+)", + } + + for pattern_type, pattern in patterns.items(): + match = re.search(pattern, channel_url) + if match: + if pattern_type == "channel_id": + return match.group(1) + return self._get_channel_id_by_name(match.group(1), pattern_type) + + return channel_url + + def _get_channel_id_by_name(self, channel_name: str, identifier_type: str) -> str: + """Gets the channel ID using the channel name or custom URL.""" + youtube = None + try: + youtube = build("youtube", "v3", developerKey=self.api_key) + + if identifier_type == "handle": + channel_name = channel_name.lstrip("@") + + request = youtube.search().list(part="id", q=channel_name, type="channel", maxResults=1) + response = request.execute() + + if response["items"]: + return response["items"][0]["id"]["channelId"] + + error_msg = f"Could not find channel ID for: {channel_name}" + raise ValueError(error_msg) + + except (HttpError, HTTPError) as e: + error_msg = f"YouTube API error while getting channel ID: {e!s}" + raise RuntimeError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error while getting channel ID: {e!s}" + raise ValueError(error_msg) from e + finally: + if youtube: + youtube.close() + + def _get_channel_playlists(self, youtube: Any, channel_id: str) -> list[dict[str, Any]]: + """Gets the public playlists for a channel.""" + try: + playlists_request = youtube.playlists().list( + part="snippet,contentDetails", + channelId=channel_id, + maxResults=self.MAX_PLAYLIST_RESULTS, + ) + playlists_response = playlists_request.execute() + playlists = [] + + for item in playlists_response.get("items", []): + playlist_data = { + "playlist_title": item["snippet"]["title"], + "playlist_description": item["snippet"]["description"], + "playlist_id": item["id"], + "playlist_video_count": item["contentDetails"]["itemCount"], + "playlist_published_at": item["snippet"]["publishedAt"], + "playlist_thumbnail_url": item["snippet"]["thumbnails"]["default"]["url"], + } + playlists.append(playlist_data) + + return playlists + except (HttpError, HTTPError) as e: + return [{"error": str(e)}] + else: + return playlists + + def get_channel_info(self) -> DataFrame: + """Retrieves channel information and returns it as a DataFrame.""" + youtube = None + try: + # Get channel ID and initialize YouTube API client + channel_id = self._extract_channel_id(self.channel_url) + youtube = build("youtube", "v3", developerKey=self.api_key) + + # Prepare parts for the API request + parts = ["snippet", "contentDetails"] + if self.include_statistics: + parts.append("statistics") + if self.include_branding: + parts.append("brandingSettings") + + # Get channel information + channel_response = youtube.channels().list(part=",".join(parts), id=channel_id).execute() + + if not channel_response["items"]: + return DataFrame(pd.DataFrame({"error": ["Channel not found"]})) + + channel_info = channel_response["items"][0] + + # Build basic channel data + channel_data = { + "title": [channel_info["snippet"]["title"]], + "description": [channel_info["snippet"]["description"]], + "custom_url": [channel_info["snippet"].get("customUrl", "")], + "published_at": [channel_info["snippet"]["publishedAt"]], + "country": [channel_info["snippet"].get("country", "Not specified")], + "channel_id": [channel_id], + } + + # Add thumbnails + for size, thumb in channel_info["snippet"]["thumbnails"].items(): + channel_data[f"thumbnail_{size}"] = [thumb["url"]] + + # Add statistics if requested + if self.include_statistics: + stats = channel_info["statistics"] + channel_data.update( + { + "view_count": [int(stats.get("viewCount", 0))], + "subscriber_count": [int(stats.get("subscriberCount", 0))], + "hidden_subscriber_count": [stats.get("hiddenSubscriberCount", False)], + "video_count": [int(stats.get("videoCount", 0))], + } + ) + + # Add branding if requested + if self.include_branding: + branding = channel_info.get("brandingSettings", {}) + channel_data.update( + { + "brand_title": [branding.get("channel", {}).get("title", "")], + "brand_description": [branding.get("channel", {}).get("description", "")], + "brand_keywords": [branding.get("channel", {}).get("keywords", "")], + "brand_banner_url": [branding.get("image", {}).get("bannerExternalUrl", "")], + } + ) + + # Create the initial DataFrame + channel_df = pd.DataFrame(channel_data) + + # Add playlists if requested + if self.include_playlists: + playlists = self._get_channel_playlists(youtube, channel_id) + if playlists and "error" not in playlists[0]: + # Create a DataFrame for playlists + playlists_df = pd.DataFrame(playlists) + # Join with main DataFrame + channel_df = pd.concat([channel_df] * len(playlists_df), ignore_index=True) + for column in playlists_df.columns: + channel_df[column] = playlists_df[column].to_numpy() + + return DataFrame(channel_df) + + except (HttpError, HTTPError, Exception) as e: + return DataFrame(pd.DataFrame({"error": [str(e)]})) + finally: + if youtube: + youtube.close() diff --git a/src/backend/base/langflow/components/youtube/comments.py b/src/backend/base/langflow/components/youtube/comments.py new file mode 100644 index 000000000000..05fccce56b2f --- /dev/null +++ b/src/backend/base/langflow/components/youtube/comments.py @@ -0,0 +1,231 @@ +from contextlib import contextmanager + +import pandas as pd +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from langflow.custom import Component +from langflow.inputs import BoolInput, DropdownInput, IntInput, MessageTextInput, SecretStrInput +from langflow.schema import DataFrame +from langflow.template import Output + + +class YouTubeCommentsComponent(Component): + """A component that retrieves comments from YouTube videos.""" + + display_name: str = "YouTube Comments" + description: str = "Retrieves and analyzes comments from YouTube videos." + icon: str = "YouTube" + + # Constants + COMMENTS_DISABLED_STATUS = 403 + NOT_FOUND_STATUS = 404 + API_MAX_RESULTS = 100 + + inputs = [ + MessageTextInput( + name="video_url", + display_name="Video URL", + info="The URL of the YouTube video to get comments from.", + tool_mode=True, + required=True, + ), + SecretStrInput( + name="api_key", + display_name="YouTube API Key", + info="Your YouTube Data API key.", + required=True, + ), + IntInput( + name="max_results", + display_name="Max Results", + value=20, + info="The maximum number of comments to return.", + ), + DropdownInput( + name="sort_by", + display_name="Sort By", + options=["time", "relevance"], + value="relevance", + info="Sort comments by time or relevance.", + ), + BoolInput( + name="include_replies", + display_name="Include Replies", + value=False, + info="Whether to include replies to comments.", + advanced=True, + ), + BoolInput( + name="include_metrics", + display_name="Include Metrics", + value=True, + info="Include metrics like like count and reply count.", + advanced=True, + ), + ] + + outputs = [ + Output(name="comments", display_name="Comments", method="get_video_comments"), + ] + + def _extract_video_id(self, video_url: str) -> str: + """Extracts the video ID from a YouTube URL.""" + import re + + patterns = [ + r"(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?#]+)", + r"youtube.com\/shorts\/([^&\n?#]+)", + ] + + for pattern in patterns: + match = re.search(pattern, video_url) + if match: + return match.group(1) + + return video_url.strip() + + def _process_reply(self, reply: dict, parent_id: str, *, include_metrics: bool = True) -> dict: + """Process a single reply comment.""" + reply_snippet = reply["snippet"] + reply_data = { + "comment_id": reply["id"], + "parent_comment_id": parent_id, + "author": reply_snippet["authorDisplayName"], + "text": reply_snippet["textDisplay"], + "published_at": reply_snippet["publishedAt"], + "is_reply": True, + } + if include_metrics: + reply_data["like_count"] = reply_snippet["likeCount"] + reply_data["reply_count"] = 0 # Replies can't have replies + + return reply_data + + def _process_comment( + self, item: dict, *, include_metrics: bool = True, include_replies: bool = False + ) -> list[dict]: + """Process a single comment thread.""" + comment = item["snippet"]["topLevelComment"]["snippet"] + comment_id = item["snippet"]["topLevelComment"]["id"] + + # Basic comment data + processed_comments = [ + { + "comment_id": comment_id, + "parent_comment_id": "", # Empty for top-level comments + "author": comment["authorDisplayName"], + "author_channel_url": comment.get("authorChannelUrl", ""), + "text": comment["textDisplay"], + "published_at": comment["publishedAt"], + "updated_at": comment["updatedAt"], + "is_reply": False, + } + ] + + # Add metrics if requested + if include_metrics: + processed_comments[0].update( + { + "like_count": comment["likeCount"], + "reply_count": item["snippet"]["totalReplyCount"], + } + ) + + # Add replies if requested + if include_replies and item["snippet"]["totalReplyCount"] > 0 and "replies" in item: + for reply in item["replies"]["comments"]: + reply_data = self._process_reply(reply, parent_id=comment_id, include_metrics=include_metrics) + processed_comments.append(reply_data) + + return processed_comments + + @contextmanager + def youtube_client(self): + """Context manager for YouTube API client.""" + client = build("youtube", "v3", developerKey=self.api_key) + try: + yield client + finally: + client.close() + + def get_video_comments(self) -> DataFrame: + """Retrieves comments from a YouTube video and returns as DataFrame.""" + try: + # Extract video ID from URL + video_id = self._extract_video_id(self.video_url) + + # Use context manager for YouTube API client + with self.youtube_client() as youtube: + comments_data = [] + results_count = 0 + request = youtube.commentThreads().list( + part="snippet,replies", + videoId=video_id, + maxResults=min(self.API_MAX_RESULTS, self.max_results), + order=self.sort_by, + textFormat="plainText", + ) + + while request and results_count < self.max_results: + response = request.execute() + + for item in response.get("items", []): + if results_count >= self.max_results: + break + + comments = self._process_comment( + item, include_metrics=self.include_metrics, include_replies=self.include_replies + ) + comments_data.extend(comments) + results_count += 1 + + # Get the next page if available and needed + if "nextPageToken" in response and results_count < self.max_results: + request = youtube.commentThreads().list( + part="snippet,replies", + videoId=video_id, + maxResults=min(self.API_MAX_RESULTS, self.max_results - results_count), + order=self.sort_by, + textFormat="plainText", + pageToken=response["nextPageToken"], + ) + else: + request = None + + # Convert to DataFrame + comments_df = pd.DataFrame(comments_data) + + # Add video metadata + comments_df["video_id"] = video_id + comments_df["video_url"] = self.video_url + + # Sort columns for better organization + column_order = [ + "video_id", + "video_url", + "comment_id", + "parent_comment_id", + "is_reply", + "author", + "author_channel_url", + "text", + "published_at", + "updated_at", + ] + + if self.include_metrics: + column_order.extend(["like_count", "reply_count"]) + + comments_df = comments_df[column_order] + + return DataFrame(comments_df) + + except HttpError as e: + error_message = f"YouTube API error: {e!s}" + if e.resp.status == self.COMMENTS_DISABLED_STATUS: + error_message = "Comments are disabled for this video or API quota exceeded." + elif e.resp.status == self.NOT_FOUND_STATUS: + error_message = "Video not found." + + return DataFrame(pd.DataFrame({"error": [error_message]})) diff --git a/src/backend/base/langflow/components/youtube/playlist.py b/src/backend/base/langflow/components/youtube/playlist.py new file mode 100644 index 000000000000..d81d657eadc2 --- /dev/null +++ b/src/backend/base/langflow/components/youtube/playlist.py @@ -0,0 +1,32 @@ +from pytube import Playlist # Ensure you have pytube installed + +from langflow.custom import Component +from langflow.inputs import MessageTextInput +from langflow.schema import Data, DataFrame +from langflow.template import Output + + +class YouTubePlaylistComponent(Component): + display_name = "Youtube Playlist" + description = "Extracts all video URLs from a YouTube playlist." + icon = "YouTube" # Replace with a suitable icon + + inputs = [ + MessageTextInput( + name="playlist_url", + display_name="Playlist URL", + info="URL of the YouTube playlist.", + required=True, + ), + ] + + outputs = [ + Output(display_name="Video URLs", name="video_urls", method="extract_video_urls"), + ] + + def extract_video_urls(self) -> DataFrame: + playlist_url = self.playlist_url + playlist = Playlist(playlist_url) + video_urls = [video.watch_url for video in playlist.videos] + + return DataFrame([Data(data={"video_url": url}) for url in video_urls]) diff --git a/src/backend/base/langflow/components/youtube/search.py b/src/backend/base/langflow/components/youtube/search.py new file mode 100644 index 000000000000..1efdee7f0f0d --- /dev/null +++ b/src/backend/base/langflow/components/youtube/search.py @@ -0,0 +1,120 @@ +from contextlib import contextmanager + +import pandas as pd +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from langflow.custom import Component +from langflow.inputs import BoolInput, DropdownInput, IntInput, MessageTextInput, SecretStrInput +from langflow.schema import DataFrame +from langflow.template import Output + + +class YouTubeSearchComponent(Component): + """A component that searches YouTube videos.""" + + display_name: str = "YouTube Search" + description: str = "Searches YouTube videos based on query." + icon: str = "YouTube" + + inputs = [ + MessageTextInput( + name="query", + display_name="Search Query", + info="The search query to look for on YouTube.", + tool_mode=True, + required=True, + ), + SecretStrInput( + name="api_key", + display_name="YouTube API Key", + info="Your YouTube Data API key.", + required=True, + ), + IntInput( + name="max_results", + display_name="Max Results", + value=10, + info="The maximum number of results to return.", + ), + DropdownInput( + name="order", + display_name="Sort Order", + options=["relevance", "date", "rating", "title", "viewCount"], + value="relevance", + info="Sort order for the search results.", + ), + BoolInput( + name="include_metadata", + display_name="Include Metadata", + value=True, + info="Include video metadata like description and statistics.", + advanced=True, + ), + ] + + outputs = [ + Output(name="results", display_name="Search Results", method="search_videos"), + ] + + @contextmanager + def youtube_client(self): + """Context manager for YouTube API client.""" + client = build("youtube", "v3", developerKey=self.api_key) + try: + yield client + finally: + client.close() + + def search_videos(self) -> DataFrame: + """Searches YouTube videos and returns results as DataFrame.""" + try: + with self.youtube_client() as youtube: + search_response = ( + youtube.search() + .list( + q=self.query, + part="id,snippet", + maxResults=self.max_results, + order=self.order, + type="video", + ) + .execute() + ) + + results = [] + for search_result in search_response.get("items", []): + video_id = search_result["id"]["videoId"] + snippet = search_result["snippet"] + + result = { + "video_id": video_id, + "title": snippet["title"], + "description": snippet["description"], + "published_at": snippet["publishedAt"], + "channel_title": snippet["channelTitle"], + "thumbnail_url": snippet["thumbnails"]["default"]["url"], + } + + if self.include_metadata: + # Get video details for additional metadata + video_response = youtube.videos().list(part="statistics,contentDetails", id=video_id).execute() + + if video_response.get("items"): + video_details = video_response["items"][0] + result.update( + { + "view_count": int(video_details["statistics"]["viewCount"]), + "like_count": int(video_details["statistics"].get("likeCount", 0)), + "comment_count": int(video_details["statistics"].get("commentCount", 0)), + "duration": video_details["contentDetails"]["duration"], + } + ) + + results.append(result) + + return DataFrame(pd.DataFrame(results)) + + except HttpError as e: + error_message = f"YouTube API error: {e!s}" + return DataFrame(pd.DataFrame({"error": [error_message]})) diff --git a/src/backend/base/langflow/components/youtube/trending.py b/src/backend/base/langflow/components/youtube/trending.py new file mode 100644 index 000000000000..85ad669771b5 --- /dev/null +++ b/src/backend/base/langflow/components/youtube/trending.py @@ -0,0 +1,286 @@ +from contextlib import contextmanager + +import pandas as pd +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from langflow.custom import Component +from langflow.inputs import BoolInput, DropdownInput, IntInput, SecretStrInput +from langflow.schema import DataFrame +from langflow.template import Output + +HTTP_FORBIDDEN = 403 +HTTP_NOT_FOUND = 404 +MAX_API_RESULTS = 50 + + +class YouTubeTrendingComponent(Component): + """A component that retrieves trending videos from YouTube.""" + + display_name: str = "YouTube Trending" + description: str = "Retrieves trending videos from YouTube with filtering options." + icon: str = "YouTube" + + # Dictionary of country codes and names + COUNTRY_CODES = { + "Global": "US", # Default to US for global + "United States": "US", + "Brazil": "BR", + "United Kingdom": "GB", + "India": "IN", + "Japan": "JP", + "South Korea": "KR", + "Germany": "DE", + "France": "FR", + "Canada": "CA", + "Australia": "AU", + "Spain": "ES", + "Italy": "IT", + "Mexico": "MX", + "Russia": "RU", + "Netherlands": "NL", + "Poland": "PL", + "Argentina": "AR", + } + + # Dictionary of video categories + VIDEO_CATEGORIES = { + "All": "0", + "Film & Animation": "1", + "Autos & Vehicles": "2", + "Music": "10", + "Pets & Animals": "15", + "Sports": "17", + "Travel & Events": "19", + "Gaming": "20", + "People & Blogs": "22", + "Comedy": "23", + "Entertainment": "24", + "News & Politics": "25", + "Education": "27", + "Science & Technology": "28", + "Nonprofits & Activism": "29", + } + + inputs = [ + SecretStrInput( + name="api_key", + display_name="YouTube API Key", + info="Your YouTube Data API key.", + required=True, + ), + DropdownInput( + name="region", + display_name="Region", + options=list(COUNTRY_CODES.keys()), + value="Global", + info="The region to get trending videos from.", + ), + DropdownInput( + name="category", + display_name="Category", + options=list(VIDEO_CATEGORIES.keys()), + value="All", + info="The category of videos to retrieve.", + ), + IntInput( + name="max_results", + display_name="Max Results", + value=10, + info="Maximum number of trending videos to return (1-50).", + ), + BoolInput( + name="include_statistics", + display_name="Include Statistics", + value=True, + info="Include video statistics (views, likes, comments).", + ), + BoolInput( + name="include_content_details", + display_name="Include Content Details", + value=True, + info="Include video duration and quality info.", + advanced=True, + ), + BoolInput( + name="include_thumbnails", + display_name="Include Thumbnails", + value=True, + info="Include video thumbnail URLs.", + advanced=True, + ), + ] + + outputs = [ + Output(name="trending_videos", display_name="Trending Videos", method="get_trending_videos"), + ] + + max_results: int + + def _format_duration(self, duration: str) -> str: + """Formats ISO 8601 duration to readable format.""" + import re + + # Remove 'PT' from the start of duration + duration = duration[2:] + + hours = 0 + minutes = 0 + seconds = 0 + + # Extract hours, minutes and seconds + time_dict = {} + for time_unit in ["H", "M", "S"]: + match = re.search(r"(\d+)" + time_unit, duration) + if match: + time_dict[time_unit] = int(match.group(1)) + + if "H" in time_dict: + hours = time_dict["H"] + if "M" in time_dict: + minutes = time_dict["M"] + if "S" in time_dict: + seconds = time_dict["S"] + + # Format the time string + if hours > 0: + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + return f"{minutes:02d}:{seconds:02d}" + + @contextmanager + def youtube_client(self): + """Context manager for YouTube API client.""" + client = build("youtube", "v3", developerKey=self.api_key) + try: + yield client + finally: + client.close() + + def get_trending_videos(self) -> DataFrame: + """Retrieves trending videos from YouTube and returns as DataFrame.""" + try: + # Validate max_results + if not 1 <= self.max_results <= MAX_API_RESULTS: + self.max_results = min(max(1, self.max_results), MAX_API_RESULTS) + + # Use context manager for YouTube API client + with self.youtube_client() as youtube: + # Get country code + region_code = self.COUNTRY_CODES[self.region] + + # Prepare API request parts + parts = ["snippet"] + if self.include_statistics: + parts.append("statistics") + if self.include_content_details: + parts.append("contentDetails") + + # Prepare API request parameters + request_params = { + "part": ",".join(parts), + "chart": "mostPopular", + "regionCode": region_code, + "maxResults": self.max_results, + } + + # Add category filter if not "All" + if self.category != "All": + request_params["videoCategoryId"] = self.VIDEO_CATEGORIES[self.category] + + # Get trending videos + request = youtube.videos().list(**request_params) + response = request.execute() + + videos_data = [] + for item in response.get("items", []): + video_data = { + "video_id": item["id"], + "title": item["snippet"]["title"], + "description": item["snippet"]["description"], + "channel_id": item["snippet"]["channelId"], + "channel_title": item["snippet"]["channelTitle"], + "published_at": item["snippet"]["publishedAt"], + "url": f"https://www.youtube.com/watch?v={item['id']}", + "region": self.region, + "category": self.category, + } + + # Add thumbnails if requested + if self.include_thumbnails: + for size, thumb in item["snippet"]["thumbnails"].items(): + video_data[f"thumbnail_{size}_url"] = thumb["url"] + video_data[f"thumbnail_{size}_width"] = thumb.get("width", 0) + video_data[f"thumbnail_{size}_height"] = thumb.get("height", 0) + + # Add statistics if requested + if self.include_statistics and "statistics" in item: + video_data.update( + { + "view_count": int(item["statistics"].get("viewCount", 0)), + "like_count": int(item["statistics"].get("likeCount", 0)), + "comment_count": int(item["statistics"].get("commentCount", 0)), + } + ) + + # Add content details if requested + if self.include_content_details and "contentDetails" in item: + content_details = item["contentDetails"] + video_data.update( + { + "duration": self._format_duration(content_details["duration"]), + "definition": content_details.get("definition", "hd").upper(), + "has_captions": content_details.get("caption", "false") == "true", + "licensed_content": content_details.get("licensedContent", False), + "projection": content_details.get("projection", "rectangular"), + } + ) + + videos_data.append(video_data) + + # Convert to DataFrame + videos_df = pd.DataFrame(videos_data) + + # Organize columns + column_order = [ + "video_id", + "title", + "channel_id", + "channel_title", + "category", + "region", + "published_at", + "url", + "description", + ] + + if self.include_statistics: + column_order.extend(["view_count", "like_count", "comment_count"]) + + if self.include_content_details: + column_order.extend(["duration", "definition", "has_captions", "licensed_content", "projection"]) + + # Add thumbnail columns at the end if included + if self.include_thumbnails: + thumbnail_cols = [col for col in videos_df.columns if col.startswith("thumbnail_")] + column_order.extend(sorted(thumbnail_cols)) + + # Reorder columns, including any that might not be in column_order + remaining_cols = [col for col in videos_df.columns if col not in column_order] + videos_df = videos_df[column_order + remaining_cols] + + return DataFrame(videos_df) + + except HttpError as e: + error_message = f"YouTube API error: {e}" + if e.resp.status == HTTP_FORBIDDEN: + error_message = "API quota exceeded or access forbidden." + elif e.resp.status == HTTP_NOT_FOUND: + error_message = "Resource not found." + + return DataFrame(pd.DataFrame({"error": [error_message]})) + + except Exception as e: + import logging + + logging.exception("An unexpected error occurred:") + return DataFrame(pd.DataFrame({"error": [str(e)]})) diff --git a/src/backend/base/langflow/components/youtube/video_details.py b/src/backend/base/langflow/components/youtube/video_details.py new file mode 100644 index 000000000000..013d1d46514c --- /dev/null +++ b/src/backend/base/langflow/components/youtube/video_details.py @@ -0,0 +1,263 @@ +from contextlib import contextmanager + +import googleapiclient +import pandas as pd +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from langflow.custom import Component +from langflow.inputs import BoolInput, MessageTextInput, SecretStrInput +from langflow.schema import DataFrame +from langflow.template import Output + + +class YouTubeVideoDetailsComponent(Component): + """A component that retrieves detailed information about YouTube videos.""" + + display_name: str = "YouTube Video Details" + description: str = "Retrieves detailed information and statistics about YouTube videos." + icon: str = "YouTube" + + inputs = [ + MessageTextInput( + name="video_url", + display_name="Video URL", + info="The URL of the YouTube video.", + tool_mode=True, + required=True, + ), + SecretStrInput( + name="api_key", + display_name="YouTube API Key", + info="Your YouTube Data API key.", + required=True, + ), + BoolInput( + name="include_statistics", + display_name="Include Statistics", + value=True, + info="Include video statistics (views, likes, comments).", + ), + BoolInput( + name="include_content_details", + display_name="Include Content Details", + value=True, + info="Include video duration, quality, and age restriction info.", + advanced=True, + ), + BoolInput( + name="include_tags", + display_name="Include Tags", + value=True, + info="Include video tags and keywords.", + advanced=True, + ), + BoolInput( + name="include_thumbnails", + display_name="Include Thumbnails", + value=True, + info="Include video thumbnail URLs in different resolutions.", + advanced=True, + ), + ] + + outputs = [ + Output(name="video_data", display_name="Video Data", method="get_video_details"), + ] + + API_FORBIDDEN = 403 + VIDEO_NOT_FOUND = 404 + + @contextmanager + def youtube_client(self): + """Context manager for YouTube API client.""" + client = build("youtube", "v3", developerKey=self.api_key) + try: + yield client + finally: + client.close() + + def _extract_video_id(self, video_url: str) -> str: + """Extracts the video ID from a YouTube URL.""" + import re + + patterns = [ + r"(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?#]+)", + r"youtube.com\/shorts\/([^&\n?#]+)", + ] + + for pattern in patterns: + match = re.search(pattern, video_url) + if match: + return match.group(1) + + return video_url.strip() + + def _format_duration(self, duration: str) -> str: + """Formats the ISO 8601 duration to a readable format.""" + import re + + hours = 0 + minutes = 0 + seconds = 0 + + hours_match = re.search(r"(\d+)H", duration) + minutes_match = re.search(r"(\d+)M", duration) + seconds_match = re.search(r"(\d+)S", duration) + + if hours_match: + hours = int(hours_match.group(1)) + if minutes_match: + minutes = int(minutes_match.group(1)) + if seconds_match: + seconds = int(seconds_match.group(1)) + + if hours > 0: + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + return f"{minutes:02d}:{seconds:02d}" + + def get_video_details(self) -> DataFrame: + """Retrieves detailed information about a YouTube video and returns as DataFrame.""" + try: + with self.youtube_client() as youtube: + # Extract video ID + video_id = self._extract_video_id(self.video_url) + + # Prepare parts for the API request + parts = ["snippet"] + if self.include_statistics: + parts.append("statistics") + if self.include_content_details: + parts.append("contentDetails") + + # Get video information + video_response = youtube.videos().list(part=",".join(parts), id=video_id).execute() + + if not video_response["items"]: + return DataFrame(pd.DataFrame({"error": ["Video not found"]})) + + video_info = video_response["items"][0] + snippet = video_info["snippet"] + + # Build video data dictionary + video_data = { + "video_id": [video_id], + "url": [f"https://www.youtube.com/watch?v={video_id}"], + "title": [snippet["title"]], + "description": [snippet["description"]], + "published_at": [snippet["publishedAt"]], + "channel_id": [snippet["channelId"]], + "channel_title": [snippet["channelTitle"]], + "category_id": [snippet.get("categoryId", "Unknown")], + "live_broadcast_content": [snippet.get("liveBroadcastContent", "none")], + } + + # Add thumbnails if requested + if self.include_thumbnails: + for size, thumb in snippet["thumbnails"].items(): + video_data[f"thumbnail_{size}_url"] = [thumb["url"]] + video_data[f"thumbnail_{size}_width"] = [thumb.get("width", 0)] + video_data[f"thumbnail_{size}_height"] = [thumb.get("height", 0)] + + # Add tags if requested + if self.include_tags and "tags" in snippet: + video_data["tags"] = [", ".join(snippet["tags"])] + video_data["tags_count"] = [len(snippet["tags"])] + + # Add statistics if requested + if self.include_statistics and "statistics" in video_info: + stats = video_info["statistics"] + video_data.update( + { + "view_count": [int(stats.get("viewCount", 0))], + "like_count": [int(stats.get("likeCount", 0))], + "favorite_count": [int(stats.get("favoriteCount", 0))], + "comment_count": [int(stats.get("commentCount", 0))], + } + ) + + # Add content details if requested + if self.include_content_details and "contentDetails" in video_info: + content_details = video_info["contentDetails"] + video_data.update( + { + "duration": [self._format_duration(content_details["duration"])], + "dimension": [content_details.get("dimension", "2d")], + "definition": [content_details.get("definition", "hd").upper()], + "has_captions": [content_details.get("caption", "false") == "true"], + "licensed_content": [content_details.get("licensedContent", False)], + "projection": [content_details.get("projection", "rectangular")], + "has_custom_thumbnails": [content_details.get("hasCustomThumbnail", False)], + } + ) + + # Add content rating if available + if "contentRating" in content_details: + rating_info = content_details["contentRating"] + video_data["content_rating"] = [str(rating_info)] + + # Create DataFrame with organized columns + video_df = pd.DataFrame(video_data) + + # Organize columns in logical groups + basic_cols = [ + "video_id", + "title", + "url", + "channel_id", + "channel_title", + "published_at", + "category_id", + "live_broadcast_content", + "description", + ] + + stat_cols = ["view_count", "like_count", "favorite_count", "comment_count"] + + content_cols = [ + "duration", + "dimension", + "definition", + "has_captions", + "licensed_content", + "projection", + "has_custom_thumbnails", + "content_rating", + ] + + tag_cols = ["tags", "tags_count"] + + thumb_cols = [col for col in video_df.columns if col.startswith("thumbnail_")] + + # Reorder columns based on what's included + ordered_cols = basic_cols[:] + + if self.include_statistics: + ordered_cols.extend([col for col in stat_cols if col in video_df.columns]) + + if self.include_content_details: + ordered_cols.extend([col for col in content_cols if col in video_df.columns]) + + if self.include_tags: + ordered_cols.extend([col for col in tag_cols if col in video_df.columns]) + + if self.include_thumbnails: + ordered_cols.extend(sorted(thumb_cols)) + + # Add any remaining columns + remaining_cols = [col for col in video_df.columns if col not in ordered_cols] + ordered_cols.extend(remaining_cols) + + return DataFrame(video_df[ordered_cols]) + + except (HttpError, googleapiclient.errors.HttpError) as e: + error_message = f"YouTube API error: {e!s}" + if e.resp.status == self.API_FORBIDDEN: + error_message = "API quota exceeded or access forbidden." + elif e.resp.status == self.VIDEO_NOT_FOUND: + error_message = "Video not found." + + return DataFrame(pd.DataFrame({"error": [error_message]})) + + except KeyError as e: + return DataFrame(pd.DataFrame({"error": [str(e)]})) diff --git a/src/backend/base/langflow/components/youtube/youtube_transcripts.py b/src/backend/base/langflow/components/youtube/youtube_transcripts.py new file mode 100644 index 000000000000..73eeb012a15b --- /dev/null +++ b/src/backend/base/langflow/components/youtube/youtube_transcripts.py @@ -0,0 +1,85 @@ +import pandas as pd +import youtube_transcript_api +from langchain_community.document_loaders import YoutubeLoader +from langchain_community.document_loaders.youtube import TranscriptFormat + +from langflow.custom import Component +from langflow.inputs import DropdownInput, IntInput, MultilineInput +from langflow.schema import DataFrame, Message +from langflow.template import Output + + +class YouTubeTranscriptsComponent(Component): + """A component that extracts spoken content from YouTube videos as transcripts.""" + + display_name: str = "YouTube Transcripts" + description: str = "Extracts spoken content from YouTube videos with both DataFrame and text output options." + icon: str = "YouTube" + name = "YouTubeTranscripts" + + inputs = [ + MultilineInput( + name="url", + display_name="Video URL", + info="Enter the YouTube video URL to get transcripts from.", + tool_mode=True, + required=True, + ), + IntInput( + name="chunk_size_seconds", + display_name="Chunk Size (seconds)", + value=60, + info="The size of each transcript chunk in seconds.", + ), + DropdownInput( + name="translation", + display_name="Translation Language", + advanced=True, + options=["", "en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "hi", "ar", "id"], + info="Translate the transcripts to the specified language. Leave empty for no translation.", + ), + ] + + outputs = [ + Output(name="dataframe", display_name="Chunks", method="get_dataframe_output"), + Output(name="message", display_name="Transcript", method="get_message_output"), + ] + + def _load_transcripts(self, *, as_chunks: bool = True): + """Internal method to load transcripts from YouTube.""" + loader = YoutubeLoader.from_youtube_url( + self.url, + transcript_format=TranscriptFormat.CHUNKS if as_chunks else TranscriptFormat.TEXT, + chunk_size_seconds=self.chunk_size_seconds, + translation=self.translation or None, + ) + return loader.load() + + def get_dataframe_output(self) -> DataFrame: + """Provides transcript output as a DataFrame with timestamp and text columns.""" + try: + transcripts = self._load_transcripts(as_chunks=True) + + # Create DataFrame with timestamp and text columns + data = [] + for doc in transcripts: + start_seconds = int(doc.metadata["start_seconds"]) + start_minutes = start_seconds // 60 + start_seconds %= 60 + timestamp = f"{start_minutes:02d}:{start_seconds:02d}" + data.append({"timestamp": timestamp, "text": doc.page_content}) + return DataFrame(pd.DataFrame(data)) + + except (youtube_transcript_api.TranscriptsDisabled, youtube_transcript_api.NoTranscriptFound) as exc: + return DataFrame(pd.DataFrame({"error": [f"Failed to get YouTube transcripts: {exc!s}"]})) + + def get_message_output(self) -> Message: + """Provides transcript output as continuous text.""" + try: + transcripts = self._load_transcripts(as_chunks=False) + result = transcripts[0].page_content + return Message(text=result) + + except (youtube_transcript_api.TranscriptsDisabled, youtube_transcript_api.NoTranscriptFound) as exc: + error_msg = f"Failed to get YouTube transcripts: {exc!s}" + return Message(text=error_msg) diff --git a/src/frontend/src/constants/constants.ts b/src/frontend/src/constants/constants.ts index 10e11ff739ab..f70d2bb6364b 100644 --- a/src/frontend/src/constants/constants.ts +++ b/src/frontend/src/constants/constants.ts @@ -742,6 +742,8 @@ export const BUNDLES_SIDEBAR_FOLDER_NAMES = [ "assemblyai", "LangWatch", "langwatch", + "Youtube", + "youtube", ]; export const AUTHORIZED_DUPLICATE_REQUESTS = [ diff --git a/src/frontend/src/utils/styleUtils.ts b/src/frontend/src/utils/styleUtils.ts index b8e0ddaf66b7..b53fe4e04f7a 100644 --- a/src/frontend/src/utils/styleUtils.ts +++ b/src/frontend/src/utils/styleUtils.ts @@ -535,6 +535,7 @@ export const SIDEBAR_BUNDLES = [ { display_name: "Git", name: "git", icon: "GitLoader" }, { display_name: "Confluence", name: "confluence", icon: "Confluence" }, { display_name: "Mem0", name: "mem0", icon: "Mem0" }, + { display_name: "Youtube", name: "youtube", icon: "Youtube" }, ]; export const categoryIcons = { diff --git a/src/frontend/tests/extended/integrations/youtube-transcripts.spec.ts b/src/frontend/tests/extended/integrations/youtube-transcripts.spec.ts index 9fed71a6f79e..1ef0b5dd36fc 100644 --- a/src/frontend/tests/extended/integrations/youtube-transcripts.spec.ts +++ b/src/frontend/tests/extended/integrations/youtube-transcripts.spec.ts @@ -11,15 +11,11 @@ test( await page.getByTestId("sidebar-search-input").click(); await page.getByTestId("sidebar-search-input").fill("youtube"); - await page.waitForSelector('[id="toolsYouTube Transcripts"]', { - timeout: 3000, - }); + await page.waitForTimeout(2000); + + await page.getByTestId("youtubeYouTube Transcripts").hover(); + await page.getByTestId("add-component-button-youtube-transcripts").click(); - await page - .locator('//*[@id="toolsYouTube Transcripts"]') - .dragTo(page.locator('//*[@id="react-flow-id"]')); - await page.mouse.up(); - await page.mouse.down(); await page.getByTestId("fit_view").click(); let outdatedComponents = await page @@ -35,16 +31,15 @@ test( .getByTestId("textarea_str_url") .fill("https://www.youtube.com/watch?v=VqhCQZaH4Vs"); - await page.getByTestId("button_run_youtube transcripts").click(); + await page.getByTestId("fit_view").click(); - await page.waitForSelector("text=built successfully", { timeout: 30000 }); + await page.getByTestId("button_run_youtube transcripts").click(); - await page.getByTestId("output-inspection-transcription").first().click(); + await page.waitForSelector("text=built successfully", { timeout: 300000 }); + await page.getByTestId("output-inspection-transcript").first().click(); await page.waitForSelector("text=Component Output", { timeout: 30000 }); - await page.getByRole("gridcell").first().click(); - const value = await page.getByPlaceholder("Empty").inputValue(); expect(value.length).toBeGreaterThan(10); },