diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 891f008..e635f14 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,10 +26,13 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest + python -m pip install flake8 - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 -v + - name: Test with unittest + run: | + python -m unittest diff --git a/.gitignore b/.gitignore index e51142b..44fe5d4 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,4 @@ instagramy/core/__pycache__ .instagramy_cache instagramy/plugins/__pycache__ .old -methos.md instagramy/plugins/__pycache__ diff --git a/README.md b/README.md index d97e7ac..a8e4df2 100644 --- a/README.md +++ b/README.md @@ -23,8 +23,9 @@ Code style GitHub Repo size - Actions - Actions + GitHub Actions + GitHub Actions +

@@ -42,6 +43,7 @@ Scrape Instagram Users Information, Posts Details, and Hashtags details. This Pa - Download [Instagram post](#Plugins-for-Downloading-Posts) and [User profile picture](#Plugins-for-Downloading-Posts) - Have some [plugins](#Plugins) for Data analysis - No External dependencies +- Having [caching Function](#Caching-Feature) - Lightweight - Easy to Use @@ -81,7 +83,7 @@ For Login into Instagram via instagramy session id is required. No username or p **Note:** Check for session id frequently, It may be changed by Instagram - + ### Instagram User details @@ -103,6 +105,19 @@ True >>> user.user_data # More data about user as dict ``` +If you get the data of the user onetime, instagramy store the data as cache file for avoid the error. you can get the data from cache also. Don't provide the sessionid. + +```python +>>> from instagramy import InstagramUser + +>>> user = InstagramUser('google', from_cache=True) + +>>> user.is_verified +True +``` + +It is opt of all classes `InstagramUser`, `InstagramHashTag` and `InstagramPost`. +
Show all Properties

@@ -206,6 +221,7 @@ Class `InstagramPost` scrape some of the information related to the particular p - number_of_comments - number_of_likes - post_source +- text - type_of_post - upload_time @@ -281,6 +297,21 @@ You can use this package without login. Sessionid is not required but it may ris >>> tag.tag_data ``` +### Caching Feature + +from version `4.3`, Added the new feature that is caching the required data. If you get the data of the user onetime, instagramy store the data as cache json file for avoid the error. you can get the data from cache also. Don't need to provide the sessionid. Instead of sessionid add the optional parameter `from_cache=True`. + +```python +>>> from instagramy import InstagramUser + +>>> user = InstagramUser('google', from_cache=True) + +>>> user.is_verified +True +``` + +It is opt of all classes `InstagramUser`, `InstagramHashTag` and `InstagramPost`. + ## Sample Scripts You can get some Sample scripts [Here](https://yogeshwaran01.herokuapp.com/post/Instagramy-Python-Package-for-Instagram) diff --git a/instagramy/InstagramHashTag.py b/instagramy/InstagramHashTag.py index e16419f..b1e7dbd 100644 --- a/instagramy/InstagramHashTag.py +++ b/instagramy/InstagramHashTag.py @@ -9,50 +9,71 @@ ------------- :: - from instagramy.InstagramHashtag import InstagramHashtag + >>> from instagramy.InstagramHashtag import InstagramHashtag >>> tag = InstagramHashtag('python') >>> tag.number_of_posts >>> tag.top_posts """ -from datetime import datetime -from collections import namedtuple from .core.parser import Parser from .core.parser import Viewer +from .core.parser import TagParser from .core.exceptions import HashTagNotFound from .core.exceptions import RedirectionError from .core.exceptions import HTTPError +from .core.cache import Cache from .core.requests import get -class InstagramHashTag: - """ - Class InstagramHashTag scrapes instagram hashtag information +class InstagramHashTag(TagParser): + r""" + Scrapes instagram hashtag information + + :param tag: Name of the Instagram Hashtag + :param sessionid (optional): Session id of Instagram which is in browser cookies + :param from_cache (optional): Get data from the cache of instagramy not from instagram + >>> hashtag = InstagramHashTag("python") >>> hashtag.number_of_posts 3119668 >>> instagram_user.posts_display_urls """ - def __init__(self, tag: str, sessionid=None): + def __init__(self, tag: str, sessionid=None, from_cache=False): self.url = f"https://www.instagram.com/explore/tags/{tag}/" self.sessionid = sessionid - data = self.get_json() - try: - self.tag_data = data["entry_data"]["TagPage"][0]["graphql"]["hashtag"] - except KeyError: - raise RedirectionError + cache = Cache("tag") + if from_cache: + if cache.is_exists(tag): + self.tag_data = cache.read_cache(tag) + else: + data = self.get_json() + cache.make_cache( + tag, data["entry_data"]["TagPage"][0]["graphql"]["hashtag"] + ) + self.tag_data = data["entry_data"]["TagPage"][0]["graphql"]["hashtag"] + else: + data = self.get_json() + cache.make_cache( + tag, data["entry_data"]["TagPage"][0]["graphql"]["hashtag"] + ) + try: + self.tag_data = data["entry_data"]["TagPage"][0]["graphql"]["hashtag"] + except KeyError: + raise RedirectionError if sessionid: - self.viewer = Viewer(data=data["config"]["viewer"]) + try: + self.viewer = Viewer(data=data["config"]["viewer"]) + except UnboundLocalError: + self.viewer = None else: self.viewer = None def get_json(self) -> dict: - """ - Return a dict of Hashtag information - """ + """ Get Hashtag information from Instagram """ + try: html = get(self.url, sessionid=self.sessionid) except HTTPError: @@ -61,78 +82,6 @@ def get_json(self) -> dict: parser.feed(html) return parser.Data - @property - def tagname(self) -> str: - """ Tagname of the Hagtag """ - return self.tag_data["name"] - - @property - def profile_pic_url(self) -> str: - """ Profile picture url of the Hagtag """ - return self.tag_data["profile_pic_url"] - - @property - def number_of_posts(self) -> int: - """ No.of posts in given Hashtag """ - return self.tag_data["edge_hashtag_to_media"]["count"] - - @property - def top_posts(self) -> list: - """ - Top post data (<70) in the given Hashtag - """ - - post_lists = [] - nodes = self.tag_data["edge_hashtag_to_media"]["edges"] - for node in nodes: - data = {} - try: - data["likes"] = node["node"]["edge_liked_by"]["count"] - except (KeyError, TypeError): - data["likes"] = None - try: - data["comments"] = node["node"]["edge_media_to_comment"]["count"] - except (KeyError, TypeError): - data["comments"] = None - try: - data["is_video"] = node["node"]["is_video"] - except (KeyError, TypeError): - data["is_video"] = None - try: - data["upload_time"] = datetime.fromtimestamp( - node["node"]["taken_at_timestamp"] - ) - except (KeyError, TypeError): - data["upload_time"] = None - try: - data["caption"] = node["node"]["accessibility_caption"] - except (KeyError, TypeError): - data["caption"] = None - try: - data["shortcode"] = node["node"]["shortcode"] - except (KeyError, TypeError): - data["shortcode"] = None - try: - data[ - "post_url" - ] = f'https://www.instagram.com/p/{node["node"]["shortcode"]}' - except (KeyError, TypeError): - data["post_url"] = None - try: - data["display_url"] = node["node"]["display_url"] - except (KeyError, TypeError): - data["display_url"] = None - nt = namedtuple("Post", data.keys())(*data.values()) - post_lists.append(nt) - return post_lists - - @property - def posts_display_urls(self) -> list: - """ - Top post (<70) in the given Hashtag - """ - return [i["display_url"] for i in self.top_posts] - def __repr__(self) -> str: return f"{self.__class__.__name__}('{self.tagname}')" diff --git a/instagramy/InstagramPost.py b/instagramy/InstagramPost.py index b2b8ed3..6081254 100644 --- a/instagramy/InstagramPost.py +++ b/instagramy/InstagramPost.py @@ -9,7 +9,7 @@ ------------- :: - from instagramy.InstagramHashtag import InstagramPost + >>> from instagramy.InstagramHashtag import InstagramPost >>> post = InstagramPost('CGeYX2OA61s') >>> post.author @@ -17,23 +17,27 @@ >>> post.number_of_comments """ -from datetime import datetime from .core.parser import Viewer from .core.parser import Parser +from .core.parser import PostParser from .core.exceptions import PostIdNotFound from .core.exceptions import RedirectionError from .core.exceptions import HTTPError +from .core.cache import Cache from .core.requests import get -class InstagramPost: - """ - Class InstagramPost scrape the post information - by given post id (From url of the post) +class InstagramPost(PostParser): + r""" + Scrape the post information `https://www.instagram.com/p//` `https://www.instagram.com/p/CGeYX2OA61s/` + :param post_id: Id of the Instagram post (From url of the post) + :param sessionid (optional): Session id of Instagram which is in browser cookies + :param from_cache (optional): Get data from the cache of instagramy not from instagram + >>> post = InstagramPost("CGeYX2OA61s") >>> post.author '@virat.kohli' @@ -43,26 +47,42 @@ class InstagramPost: 4629 """ - def __init__(self, post_id: str, sessionid=None): + def __init__(self, post_id: str, sessionid=None, from_cache=False): self.post_id = post_id self.url = f"https://www.instagram.com/p/{post_id}/" self.sessionid = sessionid - data = self.get_json() - try: - self.post_data = data["entry_data"]["PostPage"][0]["graphql"][ - "shortcode_media" - ] - except KeyError: - raise RedirectionError + cache = Cache("post") + if from_cache: + if cache.is_exists(post_id): + self.post_data = cache.read_cache(post_id) + else: + data = self.get_json() + cache.make_cache( + post_id, + data["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"], + ) + self.post_data = data["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] + else: + data = self.get_json() + cache.make_cache( + post_id, data["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] + ) + try: + self.post_data = data["entry_data"]["PostPage"][0]["graphql"][ + "shortcode_media" + ] + except KeyError: + raise RedirectionError if sessionid: - self.viewer = Viewer(data=data["config"]["viewer"]) + try: + self.viewer = Viewer(data=data["config"]["viewer"]) + except UnboundLocalError: + self.viewer = None else: self.viewer = None def get_json(self) -> dict: - """ - Return a dict of Post information - """ + """ Get post information from Instagram """ try: html = get(self.url, sessionid=self.sessionid) @@ -73,48 +93,6 @@ def get_json(self) -> dict: info = parser.Data return info - @property - def type_of_post(self) -> str: - """ Type of the Post""" - return self.post_data["__typename"] - - @property - def display_url(self) -> str: - """ Display url of the Image/Video """ - return self.post_data["display_url"] - - @property - def upload_time(self) -> datetime: - """ Upload Datetime of the Post """ - return datetime.fromtimestamp(self.post_data["taken_at_timestamp"]) - - @property - def number_of_likes(self) -> int: - """ No.of Like is given post """ - return int(self.post_data["edge_media_preview_like"]["count"]) - - @property - def number_of_comments(self) -> int: - """ No.of Comments is given post """ - return int(self.post_data["edge_media_to_parent_comment"]["count"]) - - @property - def author(self) -> str: - """ Author of the Post """ - return self.post_data["owner"]["username"] - - @property - def caption(self) -> str: - """ Caption of the Post """ - return self.post_data["accessibility_caption"] - - @property - def post_source(self) -> str: - """ Post Image/Video Link """ - if self.post_data["is_video"]: - return self.post_data["video_url"] - return self.display_url - def __repr__(self) -> str: return f"{self.__class__.__name__}('{self.post_id}')" diff --git a/instagramy/InstagramUser.py b/instagramy/InstagramUser.py index a8b5676..418666f 100644 --- a/instagramy/InstagramUser.py +++ b/instagramy/InstagramUser.py @@ -9,7 +9,7 @@ ------------- :: - from instagramy.InstagramUser import InstagramUser + >>> from instagramy.InstagramUser import InstagramUser >>> user = InstagramUser('github') >>> user.is_verified @@ -17,20 +17,25 @@ >>> user.biography """ -from datetime import datetime -from collections import namedtuple from .core.parser import Parser from .core.parser import Viewer +from .core.parser import UserParser from .core.exceptions import UsernameNotFound from .core.exceptions import RedirectionError from .core.exceptions import HTTPError +from .core.cache import Cache from .core.requests import get -class InstagramUser: - """ - Class InstagramUser scrapes instagram user information +class InstagramUser(UserParser): + r""" + Scrapes instagram user information. + + :param username: Username of the Instagram user + :param sessionid (optional): Session id of Instagram which is in browser cookies + :param from_cache (optional): Get data from the cache of instagramy not from instagram + >>> instagram_user = InstagramUser("github") >>> instagram_user.is_verified True @@ -38,24 +43,40 @@ class InstagramUser: 'Built for developers.' """ - def __init__(self, username: str, sessionid=None): + def __init__(self, username: str, sessionid=None, from_cache=False): self.url = f"https://www.instagram.com/{username}/" self.sessionid = sessionid - data = self.get_json() - try: - self.user_data = data["entry_data"]["ProfilePage"][0]["graphql"]["user"] - except KeyError: - raise RedirectionError + cache = Cache("user") + if from_cache: + if cache.is_exists(username): + self.user_data = cache.read_cache(username) + else: + data = self.get_json() + cache.make_cache( + username, data["entry_data"]["ProfilePage"][0]["graphql"]["user"] + ) + self.user_data = data["entry_data"]["ProfilePage"][0]["graphql"]["user"] + else: + data = self.get_json() + cache.make_cache( + username, data["entry_data"]["ProfilePage"][0]["graphql"]["user"] + ) + try: + self.user_data = data["entry_data"]["ProfilePage"][0]["graphql"]["user"] + except KeyError: + raise RedirectionError if sessionid: - self.viewer = Viewer(data=data["config"]["viewer"]) + try: + self.viewer = Viewer(data=data["config"]["viewer"]) + except UnboundLocalError: + self.viewer = None else: self.viewer = None def get_json(self) -> dict: - """ - Return a dict of user information - """ + """ Get user information from Instagram """ + try: html = get(self.url, sessionid=self.sessionid) except HTTPError: @@ -65,203 +86,6 @@ def get_json(self) -> dict: parser.feed(html) return parser.Data - @property - def username(self) -> str: - """ Username of the given user """ - return self.user_data["username"] - - @property - def fullname(self) -> str: - """ Fullname of the given user """ - return self.user_data["full_name"] - - @property - def biography(self) -> str: - """ Biography of the given user """ - return self.user_data["biography"] - - @property - def website(self) -> str: - """ Website of the given user """ - return self.user_data["external_url"] - - @property - def number_of_followers(self) -> int: - """ No.of Followers of the given user """ - return self.user_data["edge_followed_by"]["count"] - - @property - def number_of_followings(self) -> int: - """ No.of Following of the given user """ - return self.user_data["edge_follow"]["count"] - - @property - def number_of_posts(self) -> int: - """ No.of Post of the given user """ - return self.user_data["edge_owner_to_timeline_media"]["count"] - - @property - def profile_picture_url(self) -> str: - """ Profile picture url of the Given User """ - return self.user_data["profile_pic_url_hd"] - - @property - def is_verified(self) -> bool: - """ Verification status of the user """ - return self.user_data["is_verified"] - - @property - def is_private(self) -> bool: - """ Account type is Private """ - return self.user_data["is_private"] - - @property - def posts(self) -> list: - """ - Top 12 posts data of the given user - """ - - posts_lists = [] - posts_details = self.user_data["edge_owner_to_timeline_media"]["edges"] - for i in posts_details: - data = {} - try: - data["likes"] = i["node"]["edge_liked_by"]["count"] - except (KeyError, TypeError): - data["likes"] = None - try: - data["comments"] = i["node"]["edge_media_to_comment"]["count"] - except (KeyError, TypeError): - data["comments"] = None - try: - data["caption"] = i["node"]["accessibility_caption"] - except (KeyError, TypeError): - data["caption"] = None - try: - data["is_video"] = i["node"]["is_video"] - except (KeyError, TypeError): - data["is_video"] = None - try: - data["timestamp"] = i["node"]["taken_at_timestamp"] - except (KeyError, TypeError): - data["timestamp"] = None - try: - data["location"] = i["node"]["location"] - except (KeyError, TypeError): - data["location"] = None - try: - data["shortcode"] = i["node"]["shortcode"] - except (KeyError, TypeError): - data["shortcode"] = None - try: - data[ - "post_url" - ] = f'https://www.instagram.com/p/{i["node"]["shortcode"]}/' - except (KeyError, TypeError): - data["post_url"] = None - try: - data["display_url"] = i["node"]["display_url"] - except (KeyError, TypeError): - data["display_url"] = None - - if i["node"]["is_video"]: - data["video_url"] = i["node"]["video_url"] - data["video_view_count"] = i["node"]["video_view_count"] - if i["node"]["is_video"]: - data["post_source"] = i["node"]["video_url"] - else: - data["post_source"] = i["node"]["display_url"] - - try: - data["taken_at_timestamp"] = datetime.fromtimestamp( - i["node"]["taken_at_timestamp"] - ) - except (KeyError, TypeError): - data["taken_at_timestamp"] = None - nt = namedtuple("Post", data.keys())(*data.values()) - posts_lists.append(nt) - return posts_lists - - @property - def posts_display_urls(self) -> list: - """ - Top 12 posts picture url of the given user - """ - - return [i["display_url"] for i in self.posts] - - @property - def is_joined_recently(self) -> bool: - """ Is user joined recently """ - return self.user_data["is_joined_recently"] - - @property - def other_info(self) -> dict: - """ - Other information about user - """ - return { - "is_private": self.user_data["is_private"], - "is_verified": self.user_data["is_verified"], - "is_business_account": self.user_data["is_business_account"], - "is_joined_recently": self.user_data["is_joined_recently"], - "has_ar_effects": self.user_data["has_ar_effects"], - "has_clips": self.user_data["has_clips"], - "has_guides": self.user_data["has_guides"], - "has_channel": self.user_data["has_channel"], - "highlight_reel_count": self.user_data["highlight_reel_count"], - } - - @property - def follows_viewer(self) -> bool: - """ Is user follows the Viewer """ - return self.user_data["follows_viewer"] - - @property - def has_blocked_viewer(self) -> bool: - """ Is user blocked the Viewer """ - return self.user_data["has_blocked_viewer"] - - @property - def no_of_mutual_follower(self) -> bool: - """ No of Mutual Followers """ - return self.user_data["edge_mutual_followed_by"]["count"] - - @property - def requested_by_viewer(self) -> bool: - """ Is viewer requested to follow user """ - return self.user_data["requested_by_viewer"] - - @property - def is_blocked_by_viewer(self) -> bool: - """ Is Viewer blocked the User """ - return self.user_data["blocked_by_viewer"] - - @property - def restricted_by_viewer(self) -> bool: - """ Is Viewer restricted the User """ - return self.user_data["restricted_by_viewer"] - - @property - def has_country_block(self) -> bool: - """ Is country blocked the User """ - return self.user_data["country_block"] - - @property - def followed_by_viewer(self) -> bool: - """ Is Viewer Follows the User """ - return self.user_data["followed_by_viewer"] - - @property - def has_requested_viewer(self) -> bool: - """ Is User requested the Viewer """ - return self.user_data["has_requested_viewer"] - - @property - def connected_fb_page(self) -> bool: - """ Connected Facebook page of User """ - return self.user_data["connected_fb_page"] - def __str__(self) -> str: return f"{self.fullname} ({self.username}) -> {self.biography}" diff --git a/instagramy/__init__.py b/instagramy/__init__.py index 709f1e8..af5d33c 100644 --- a/instagramy/__init__.py +++ b/instagramy/__init__.py @@ -12,7 +12,7 @@ __package__ = "instagramy" __description__ = "A python package for Instagram. It scarpe the Instagram contents." __url__ = "https://github.com/yogeshwaran01/instagramy" -__version__ = "4.2" +__version__ = "4.3" __author__ = "YOGESHWARAN R " __license__ = "MIT License" __copyright__ = "Copyright 2021 Yogeshwaran R" diff --git a/instagramy/core/cache.py b/instagramy/core/cache.py new file mode 100644 index 0000000..6163880 --- /dev/null +++ b/instagramy/core/cache.py @@ -0,0 +1,44 @@ +""" Caches Management """ + +import os +import json +import shutil + +cache_dir = ".instagramy_cache" + + +class Cache: + + """ Class for caches Management """ + + def __init__(self, key: str): + self.key = key + if not os.path.isdir(cache_dir): + os.mkdir(cache_dir) + with open(cache_dir + "/CACHEDIR.TAG", "w") as file: + file.write( + "# This file is a cache directory tag created by instagramy." + "\n" + ) + + def is_exists(self, name: str) -> bool: + return os.path.isfile(cache_dir + f"/{name}_{self.key}" + ".json") + + def make_cache(self, name: str, data: dict): + with open(cache_dir + f"/{name}_{self.key}" + ".json", "w") as file: + json.dump(data, file) + + def read_cache(self, name: str) -> dict: + with open(cache_dir + f"/{name}_{self.key}" + ".json", "r") as file: + return json.load(file) + + +def list_caches() -> None: + """ List of all Cache files created by instagramy in current dir """ + + return os.listdir(cache_dir) + + +def clear_caches() -> None: + """ Clear all Caches created by instagramy in current dir """ + + return shutil.rmtree(cache_dir, ignore_errors=True) diff --git a/instagramy/core/parser.py b/instagramy/core/parser.py index 780dc7a..9ac730c 100644 --- a/instagramy/core/parser.py +++ b/instagramy/core/parser.py @@ -1,7 +1,9 @@ -""" Html Parser for various Instagram """ +""" Parsers for Instagramy """ import json +from datetime import datetime from html.parser import HTMLParser +from collections import namedtuple from .exceptions import RedirectionError from .requests import get @@ -91,3 +93,334 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"{self.__class__.__name__}('{self.username}')" + + +class UserParser: + """ Parse the required data of user store as property""" + + @property + def username(self) -> str: + """ Username of the given user """ + return self.user_data["username"] + + @property + def fullname(self) -> str: + """ Fullname of the given user """ + return self.user_data["full_name"] + + @property + def biography(self) -> str: + """ Biography of the given user """ + return self.user_data["biography"] + + @property + def website(self) -> str: + """ Website of the given user """ + return self.user_data["external_url"] + + @property + def number_of_followers(self) -> int: + """ No.of Followers of the given user """ + return self.user_data["edge_followed_by"]["count"] + + @property + def number_of_followings(self) -> int: + """ No.of Following of the given user """ + return self.user_data["edge_follow"]["count"] + + @property + def number_of_posts(self) -> int: + """ No.of Post of the given user """ + return self.user_data["edge_owner_to_timeline_media"]["count"] + + @property + def profile_picture_url(self) -> str: + """ Profile picture url of the Given User """ + return self.user_data["profile_pic_url_hd"] + + @property + def is_verified(self) -> bool: + """ Verification status of the user """ + return self.user_data["is_verified"] + + @property + def is_private(self) -> bool: + """ Account type is Private """ + return self.user_data["is_private"] + + @property + def posts(self) -> list: + """ + Top 12 posts data of the given user + """ + + posts_lists = [] + posts_details = self.user_data["edge_owner_to_timeline_media"]["edges"] + for i in posts_details: + data = {} + try: + data["likes"] = i["node"]["edge_liked_by"]["count"] + except (KeyError, TypeError): + data["likes"] = None + try: + data["comments"] = i["node"]["edge_media_to_comment"]["count"] + except (KeyError, TypeError): + data["comments"] = None + try: + data["caption"] = i["node"]["accessibility_caption"] + except (KeyError, TypeError): + data["caption"] = None + try: + data["is_video"] = i["node"]["is_video"] + except (KeyError, TypeError): + data["is_video"] = None + try: + data["timestamp"] = i["node"]["taken_at_timestamp"] + except (KeyError, TypeError): + data["timestamp"] = None + try: + data["location"] = i["node"]["location"] + except (KeyError, TypeError): + data["location"] = None + try: + data["shortcode"] = i["node"]["shortcode"] + except (KeyError, TypeError): + data["shortcode"] = None + try: + data[ + "post_url" + ] = f'https://www.instagram.com/p/{i["node"]["shortcode"]}/' + except (KeyError, TypeError): + data["post_url"] = None + try: + data["display_url"] = i["node"]["display_url"] + except (KeyError, TypeError): + data["display_url"] = None + + if i["node"]["is_video"]: + data["video_url"] = i["node"]["video_url"] + data["video_view_count"] = i["node"]["video_view_count"] + if i["node"]["is_video"]: + data["post_source"] = i["node"]["video_url"] + else: + data["post_source"] = i["node"]["display_url"] + + try: + data["taken_at_timestamp"] = datetime.fromtimestamp( + i["node"]["taken_at_timestamp"] + ) + except (KeyError, TypeError): + data["taken_at_timestamp"] = None + nt = namedtuple("Post", data.keys())(*data.values()) + posts_lists.append(nt) + return posts_lists + + @property + def posts_display_urls(self) -> list: + """ + Top 12 posts picture url of the given user + """ + + return [i["display_url"] for i in self.posts] + + @property + def is_joined_recently(self) -> bool: + """ Is user joined recently """ + return self.user_data["is_joined_recently"] + + @property + def other_info(self) -> dict: + """ + Other information about user + """ + return { + "is_private": self.user_data["is_private"], + "is_verified": self.user_data["is_verified"], + "is_business_account": self.user_data["is_business_account"], + "is_joined_recently": self.user_data["is_joined_recently"], + "has_ar_effects": self.user_data["has_ar_effects"], + "has_clips": self.user_data["has_clips"], + "has_guides": self.user_data["has_guides"], + "has_channel": self.user_data["has_channel"], + "highlight_reel_count": self.user_data["highlight_reel_count"], + } + + @property + def follows_viewer(self) -> bool: + """ Is user follows the Viewer """ + return self.user_data["follows_viewer"] + + @property + def has_blocked_viewer(self) -> bool: + """ Is user blocked the Viewer """ + return self.user_data["has_blocked_viewer"] + + @property + def no_of_mutual_follower(self) -> bool: + """ No of Mutual Followers """ + return self.user_data["edge_mutual_followed_by"]["count"] + + @property + def requested_by_viewer(self) -> bool: + """ Is viewer requested to follow user """ + return self.user_data["requested_by_viewer"] + + @property + def is_blocked_by_viewer(self) -> bool: + """ Is Viewer blocked the User """ + return self.user_data["blocked_by_viewer"] + + @property + def restricted_by_viewer(self) -> bool: + """ Is Viewer restricted the User """ + return self.user_data["restricted_by_viewer"] + + @property + def has_country_block(self) -> bool: + """ Is country blocked the User """ + return self.user_data["country_block"] + + @property + def followed_by_viewer(self) -> bool: + """ Is Viewer Follows the User """ + return self.user_data["followed_by_viewer"] + + @property + def has_requested_viewer(self) -> bool: + """ Is User requested the Viewer """ + return self.user_data["has_requested_viewer"] + + @property + def connected_fb_page(self) -> bool: + """ Connected Facebook page of User """ + return self.user_data["connected_fb_page"] + + +class PostParser: + """ Parse the required data of post store as property""" + + @property + def type_of_post(self) -> str: + """ Type of the Post""" + return self.post_data["__typename"] + + @property + def display_url(self) -> str: + """ Display url of the Image/Video """ + return self.post_data["display_url"] + + @property + def upload_time(self) -> datetime: + """ Upload Datetime of the Post """ + return datetime.fromtimestamp(self.post_data["taken_at_timestamp"]) + + @property + def number_of_likes(self) -> int: + """ No.of Like is given post """ + return int(self.post_data["edge_media_preview_like"]["count"]) + + @property + def number_of_comments(self) -> int: + """ No.of Comments is given post """ + return int(self.post_data["edge_media_to_parent_comment"]["count"]) + + @property + def author(self) -> str: + """ Author of the Post """ + return self.post_data["owner"]["username"] + + @property + def caption(self) -> str: + """ Caption of the Post """ + return self.post_data["accessibility_caption"] + + @property + def post_source(self) -> str: + """ Post Image/Video Link """ + if self.post_data["is_video"]: + return self.post_data["video_url"] + return self.display_url + + @property + def text(self) -> str: + try: + text = self.post_data["edge_media_to_caption"]["edges"][0]["node"]["text"] + return text + except (KeyError, IndexError): + return None + + +class TagParser: + """ Parse the required data of tag store as property""" + + @property + def tagname(self) -> str: + """ Tagname of the Hagtag """ + return self.tag_data["name"] + + @property + def profile_pic_url(self) -> str: + """ Profile picture url of the Hagtag """ + return self.tag_data["profile_pic_url"] + + @property + def number_of_posts(self) -> int: + """ No.of posts in given Hashtag """ + return self.tag_data["edge_hashtag_to_media"]["count"] + + @property + def top_posts(self) -> list: + """ + Top post data (<70) in the given Hashtag + """ + + post_lists = [] + nodes = self.tag_data["edge_hashtag_to_media"]["edges"] + for node in nodes: + data = {} + try: + data["likes"] = node["node"]["edge_liked_by"]["count"] + except (KeyError, TypeError): + data["likes"] = None + try: + data["comments"] = node["node"]["edge_media_to_comment"]["count"] + except (KeyError, TypeError): + data["comments"] = None + try: + data["is_video"] = node["node"]["is_video"] + except (KeyError, TypeError): + data["is_video"] = None + try: + data["upload_time"] = datetime.fromtimestamp( + node["node"]["taken_at_timestamp"] + ) + except (KeyError, TypeError): + data["upload_time"] = None + try: + data["caption"] = node["node"]["accessibility_caption"] + except (KeyError, TypeError): + data["caption"] = None + try: + data["shortcode"] = node["node"]["shortcode"] + except (KeyError, TypeError): + data["shortcode"] = None + try: + data[ + "post_url" + ] = f'https://www.instagram.com/p/{node["node"]["shortcode"]}' + except (KeyError, TypeError): + data["post_url"] = None + try: + data["display_url"] = node["node"]["display_url"] + except (KeyError, TypeError): + data["display_url"] = None + nt = namedtuple("Post", data.keys())(*data.values()) + post_lists.append(nt) + return post_lists + + @property + def posts_display_urls(self) -> list: + """ + Top post (<70) in the given Hashtag + """ + return [i["display_url"] for i in self.top_posts] diff --git a/instagramy/plugins/manual_loading.py b/instagramy/plugins/manual_loading.py new file mode 100644 index 0000000..21424c0 --- /dev/null +++ b/instagramy/plugins/manual_loading.py @@ -0,0 +1,68 @@ +""" + instagramy.plugins.manual_loading + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Parse data of Instagram with manual feeding of Instagram Data in offline mode. + This classes are more useful for data analysis purpose. Store the data in json + file and Parse this data by using this classes instead of sending multiple requests + to the Instagram + + Usage Example + ------------- + :: + >>> import json + >>> from instagramy import InstagramUser + + >>> user = InstagramUser('github') + >>> user_data = user.user_data + + # store data of user in json file + >>> with open('github_user.json', 'w') as file_obj: + ... json.dump(user_data, file_obj) + + >>> from instagramy.plugins.manual_loading import InstagramUser + + # using the stored data + >>> with open('github_user.json', 'r') as file_obj: + ... user_data = json.load(file_obj) + >>> user = InstagramUser(user_data) + >>> user.number_of_followers + +""" + +from instagramy.core.parser import UserParser +from instagramy.core.parser import TagParser +from instagramy.core.parser import PostParser + + +class InstagramUser(UserParser): + r""" + Parse the data of User from manual loading + + :param data: user_data from `instagramy.InstagramUser.user_data` + """ + + def __init__(self, data: dict): + self.user_data = data + + +class InstagramPost(PostParser): + r""" + Parse the data of Post from manual loading + + :param data: user_data from `instagramy.InstagramPost.post_data` + """ + + def __init__(self, data: dict): + self.post_data = data + + +class InstagramHashTag(TagParser): + r""" + Parse the data of hashtag from manual loading + + :param data: user_data from `instagramy.InstagramHashTag.tag_data` + """ + + def __init__(self, data: dict): + self.tag_data = data diff --git a/samples/check.png b/samples/check.png deleted file mode 100644 index b7dc782..0000000 Binary files a/samples/check.png and /dev/null differ diff --git a/samples/suggest.png b/samples/suggest.png deleted file mode 100644 index d1758e4..0000000 Binary files a/samples/suggest.png and /dev/null differ diff --git a/setup.py b/setup.py index cc24372..a238653 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="instagramy", - version="4.2", + version="4.3", license='MIT', author="Yogeshwaran R", author_email="yogeshin247@gmail.com", diff --git a/test/test_instagramy.py b/test/test_instagramy.py index 278c241..ecee3b0 100644 --- a/test/test_instagramy.py +++ b/test/test_instagramy.py @@ -1,16 +1,51 @@ -from instagramy import InstagramUser -from instagramy import InstagramPost -from instagramy import InstagramHashTag +# Unable to fetch data from Instagram effectively +# So, Some data already store in other website to test the parsers +import unittest +import json +from instagramy.core.requests import get +from instagramy.plugins.manual_loading import * -def test_user(): - user = InstagramUser('github') - assert user.username == "github" -def test_tag(): - tag = InstagramHashTag('github') - assert tag.tagname == 'github' +# loading sample data for test from other website -def test_post(): - post = InstagramPost('CGeYX2OA61s') - assert post.author == '@virat.kohli' +user_data = json.loads(get("https://yogeshwaran01.herokuapp.com/user_data")) +post_data = json.loads(get("https://yogeshwaran01.herokuapp.com/post_data")) +tag_data = json.loads(get("https://yogeshwaran01.herokuapp.com/tag_data")) + + +class TestParsers(unittest.TestCase): + def test_InstagramUser(self): + user = InstagramUser(user_data) + self.assertEqual(user.biography, "Built for developers.") + self.assertIsNone(user.connected_fb_page) + self.assertFalse(user.followed_by_viewer) + self.assertFalse(user.follows_viewer) + self.assertEqual(user.fullname, "GitHub") + self.assertFalse(user.has_blocked_viewer) + self.assertFalse(user.has_country_block) + self.assertFalse(user.has_blocked_viewer) + self.assertFalse(user.is_blocked_by_viewer) + self.assertFalse(user.is_joined_recently) + self.assertFalse(user.is_private) + self.assertTrue(user.is_verified) + self.assertAlmostEqual(user.no_of_mutual_follower, 0) + self.assertAlmostEqual(user.number_of_followers, 139340) + self.assertAlmostEqual(user.number_of_followings, 20) + self.assertAlmostEqual(user.number_of_posts, 182) + self.assertEqual(user.username, "github") + + def test_InstagramPost(self): + post = InstagramPost(post_data) + self.assertEqual(post.author, "chilll_memes") + self.assertEqual( + post.caption, + "Photo by CHILL MEMES in MRC Nagar with @memepattarai2.0. May be a meme of 3 people and text that says 'private Hospital nurse Chilll_ memes Govt Hospital nurse'.", + ) + self.assertEqual(post.number_of_comments, 0) + self.assertEqual(post.number_of_likes, 21) + + def test_InstagramHashtag(self): + tag = InstagramHashTag(tag_data) + self.assertEqual(tag.number_of_posts, 3600401) + self.assertEqual(tag.tagname, "python")