Skip to content

Commit

Permalink
Instagram story support
Browse files Browse the repository at this point in the history
  • Loading branch information
amadejkastelic committed Aug 5, 2023
1 parent f2a043b commit 17e635b
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.vscode/
**/__pycache__
cookies.txt
instagram.sess
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ COPY Pipfile.lock ./
COPY main.py ./
COPY downloader/* ./downloader/
COPY models/* ./models/
COPY cookies.txt ./

RUN pipenv install && pipenv run playwright install chromium && pipenv run playwright install-deps

Expand Down
53 changes: 53 additions & 0 deletions bin/fetch_instagram_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from argparse import ArgumentParser
from glob import glob
from os.path import expanduser
from platform import system
from sqlite3 import OperationalError, connect

try:
from instaloader import ConnectionException, Instaloader
except ModuleNotFoundError:
raise SystemExit("Instaloader not found.\n pip install [--user] instaloader")


def get_cookiefile():
default_cookiefile = {
"Windows": "~/AppData/Roaming/Mozilla/Firefox/Profiles/*/cookies.sqlite",
"Darwin": "~/Library/Application Support/Firefox/Profiles/*/cookies.sqlite",
}.get(system(), "~/.mozilla/firefox/*/cookies.sqlite")
cookiefiles = glob(expanduser(default_cookiefile))
if not cookiefiles:
raise SystemExit("No Firefox cookies.sqlite file found. Use -c COOKIEFILE.")
return cookiefiles[0]


def import_session(cookiefile, sessionfile):
print("Using cookies from {}.".format(cookiefile))
conn = connect(f"file:{cookiefile}?immutable=1", uri=True)
try:
cookie_data = conn.execute(
"SELECT name, value FROM moz_cookies WHERE baseDomain='instagram.com'"
)
except OperationalError:
cookie_data = conn.execute(
"SELECT name, value FROM moz_cookies WHERE host LIKE '%instagram.com'"
)
instaloader = Instaloader(max_connection_attempts=1)
instaloader.context._session.cookies.update(cookie_data)
username = instaloader.test_login()
if not username:
raise SystemExit("Not logged in. Are you logged in successfully in Firefox?")
print("Imported session cookie for {}.".format(username))
instaloader.context.username = username
instaloader.save_session_to_file(sessionfile)


if __name__ == "__main__":
p = ArgumentParser()
p.add_argument("-c", "--cookiefile")
p.add_argument("-f", "--sessionfile")
args = p.parse_args()
try:
import_session(args.cookiefile or get_cookiefile(), args.sessionfile)
except (ConnectionException, OperationalError) as e:
raise SystemExit("Cookie import failed: {}".format(e))
1 change: 0 additions & 1 deletion cookies.txt

This file was deleted.

9 changes: 5 additions & 4 deletions downloader/facebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ class FacebookClient(base.BaseClient):
DOMAINS = ['facebook.com', 'fb.watch']

async def get_post(self) -> post.Post:
if not os.path.exists('cookies.txt'):
raise RuntimeError('cookies.txt missing, please export facebook cookies and place them in the app root')
kwargs = {}
if os.path.exists('cookies.txt'):
kwargs['cookies'] = 'cookies.txt'

fb_post = next(facebook_scraper.get_posts(post_urls=[self.url], cookies='cookies.txt'))
fb_post = next(facebook_scraper.get_posts(post_urls=[self.url], **kwargs))

p = post.Post(
url=self.url,
author=fb_post.get('username'),
description=fb_post.get('text'),
likes=fb_post.get('likes'),
created=fb_post.get('time').astimezone(),
created=fb_post.get('time').astimezone() if 'time' in fb_post else None,
)

if fb_post.get('video'):
Expand Down
45 changes: 44 additions & 1 deletion downloader/instagram.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import io
import os
import requests
import typing
from urllib.parse import urlparse, parse_qs

import instaloader
Expand All @@ -7,18 +10,42 @@
from models import post


class InstagramClientSingleton(object):
INSTANCE: typing.Optional[instaloader.Instaloader] = None

@classmethod
def get_instance(cls) -> typing.Optional[instaloader.Instaloader]:
if cls.INSTANCE:
return cls.INSTANCE

cls.INSTANCE = instaloader.Instaloader(
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0'
)
if os.path.exists('instagram.sess'):
cls.INSTANCE.load_session_from_file(username='amadejkastelic', filename='instagram.sess')

return cls.INSTANCE


class InstagramClient(base.BaseClient):
DOMAINS = ['instagram.com', 'ddinstagram.com']

def __init__(self, url: str):
super(InstagramClient, self).__init__(url=url)
self.client = instaloader.Instaloader()
self.client = InstagramClientSingleton.get_instance()

parsed_url = urlparse(url)
self.id = parsed_url.path.strip('/').split('/')[-1]
self.index = int(parse_qs(parsed_url.query).get('img_index', ['1'])[0]) - 1
self._is_story = '/stories/' in url

async def get_post(self) -> post.Post:
if self._is_story:
return self._get_story()

return self._get_post()

def _get_post(self) -> post.Post:
p = instaloader.Post.from_shortcode(self.client.context, self.id)

match p.typename:
Expand All @@ -43,3 +70,19 @@ async def get_post(self) -> post.Post:
buffer=io.BytesIO(resp.content),
created=p.date_local,
)

def _get_story(self) -> post.Post:
story = instaloader.StoryItem.from_mediaid(self.client.context, int(self.id))
if story.is_video:
url = story.video_url
else:
url = story.url

with requests.get(url=url) as resp:
return post.Post(
url=self.url,
author=story.owner_profile.username,
description=story.caption,
buffer=io.BytesIO(resp.content),
created=story.date_local,
)

0 comments on commit 17e635b

Please sign in to comment.