diff --git a/.gitignore b/.gitignore index 2076d2f..2bd3784 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .vscode/ **/__pycache__ +cookies.txt +instagram.sess diff --git a/Dockerfile b/Dockerfile index f960ace..0e8e20e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,6 @@ COPY Pipfile.lock ./ COPY main.py ./ COPY downloader/* ./downloader/ COPY models/* ./models/ -COPY cookies.txt ./ RUN pipenv install && pipenv run playwright install chromium && pipenv run playwright install-deps diff --git a/bin/fetch_instagram_session.py b/bin/fetch_instagram_session.py new file mode 100644 index 0000000..bfa1e8d --- /dev/null +++ b/bin/fetch_instagram_session.py @@ -0,0 +1,53 @@ +from argparse import ArgumentParser +from glob import glob +from os.path import expanduser +from platform import system +from sqlite3 import OperationalError, connect + +try: + from instaloader import ConnectionException, Instaloader +except ModuleNotFoundError: + raise SystemExit("Instaloader not found.\n pip install [--user] instaloader") + + +def get_cookiefile(): + default_cookiefile = { + "Windows": "~/AppData/Roaming/Mozilla/Firefox/Profiles/*/cookies.sqlite", + "Darwin": "~/Library/Application Support/Firefox/Profiles/*/cookies.sqlite", + }.get(system(), "~/.mozilla/firefox/*/cookies.sqlite") + cookiefiles = glob(expanduser(default_cookiefile)) + if not cookiefiles: + raise SystemExit("No Firefox cookies.sqlite file found. Use -c COOKIEFILE.") + return cookiefiles[0] + + +def import_session(cookiefile, sessionfile): + print("Using cookies from {}.".format(cookiefile)) + conn = connect(f"file:{cookiefile}?immutable=1", uri=True) + try: + cookie_data = conn.execute( + "SELECT name, value FROM moz_cookies WHERE baseDomain='instagram.com'" + ) + except OperationalError: + cookie_data = conn.execute( + "SELECT name, value FROM moz_cookies WHERE host LIKE '%instagram.com'" + ) + instaloader = Instaloader(max_connection_attempts=1) + instaloader.context._session.cookies.update(cookie_data) + username = instaloader.test_login() + if not username: + raise SystemExit("Not logged in. Are you logged in successfully in Firefox?") + print("Imported session cookie for {}.".format(username)) + instaloader.context.username = username + instaloader.save_session_to_file(sessionfile) + + +if __name__ == "__main__": + p = ArgumentParser() + p.add_argument("-c", "--cookiefile") + p.add_argument("-f", "--sessionfile") + args = p.parse_args() + try: + import_session(args.cookiefile or get_cookiefile(), args.sessionfile) + except (ConnectionException, OperationalError) as e: + raise SystemExit("Cookie import failed: {}".format(e)) diff --git a/cookies.txt b/cookies.txt deleted file mode 100644 index 8b13789..0000000 --- a/cookies.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/downloader/facebook.py b/downloader/facebook.py index 31ccc5b..19dae74 100644 --- a/downloader/facebook.py +++ b/downloader/facebook.py @@ -10,17 +10,18 @@ class FacebookClient(base.BaseClient): DOMAINS = ['facebook.com', 'fb.watch'] async def get_post(self) -> post.Post: - if not os.path.exists('cookies.txt'): - raise RuntimeError('cookies.txt missing, please export facebook cookies and place them in the app root') + kwargs = {} + if os.path.exists('cookies.txt'): + kwargs['cookies'] = 'cookies.txt' - fb_post = next(facebook_scraper.get_posts(post_urls=[self.url], cookies='cookies.txt')) + fb_post = next(facebook_scraper.get_posts(post_urls=[self.url], **kwargs)) p = post.Post( url=self.url, author=fb_post.get('username'), description=fb_post.get('text'), likes=fb_post.get('likes'), - created=fb_post.get('time').astimezone(), + created=fb_post.get('time').astimezone() if 'time' in fb_post else None, ) if fb_post.get('video'): diff --git a/downloader/instagram.py b/downloader/instagram.py index 1876ae1..6d82f29 100644 --- a/downloader/instagram.py +++ b/downloader/instagram.py @@ -1,4 +1,7 @@ import io +import os +import requests +import typing from urllib.parse import urlparse, parse_qs import instaloader @@ -7,18 +10,42 @@ from models import post +class InstagramClientSingleton(object): + INSTANCE: typing.Optional[instaloader.Instaloader] = None + + @classmethod + def get_instance(cls) -> typing.Optional[instaloader.Instaloader]: + if cls.INSTANCE: + return cls.INSTANCE + + cls.INSTANCE = instaloader.Instaloader( + user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0' + ) + if os.path.exists('instagram.sess'): + cls.INSTANCE.load_session_from_file(username='amadejkastelic', filename='instagram.sess') + + return cls.INSTANCE + + class InstagramClient(base.BaseClient): DOMAINS = ['instagram.com', 'ddinstagram.com'] def __init__(self, url: str): super(InstagramClient, self).__init__(url=url) - self.client = instaloader.Instaloader() + self.client = InstagramClientSingleton.get_instance() parsed_url = urlparse(url) self.id = parsed_url.path.strip('/').split('/')[-1] self.index = int(parse_qs(parsed_url.query).get('img_index', ['1'])[0]) - 1 + self._is_story = '/stories/' in url async def get_post(self) -> post.Post: + if self._is_story: + return self._get_story() + + return self._get_post() + + def _get_post(self) -> post.Post: p = instaloader.Post.from_shortcode(self.client.context, self.id) match p.typename: @@ -43,3 +70,19 @@ async def get_post(self) -> post.Post: buffer=io.BytesIO(resp.content), created=p.date_local, ) + + def _get_story(self) -> post.Post: + story = instaloader.StoryItem.from_mediaid(self.client.context, int(self.id)) + if story.is_video: + url = story.video_url + else: + url = story.url + + with requests.get(url=url) as resp: + return post.Post( + url=self.url, + author=story.owner_profile.username, + description=story.caption, + buffer=io.BytesIO(resp.content), + created=story.date_local, + )