This repository has been archived by the owner on Mar 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrape.py
1000 lines (827 loc) · 32.4 KB
/
scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import re
import sys
import difflib
import requests
from datetime import datetime
from typing import Any, Callable
from urllib.parse import urlparse
import py_common.log as log
from py_common.util import dig, scraper_args
from py_common.config import get_config
from py_common.types import (
ScrapedScene,
ScrapedMovie,
ScrapedPerformer,
ScrapedStudio,
ScrapedTag,
)
import AyloAPI.domains as domains
from AyloAPI.slugger import slugify
config = get_config(
default="""
# User Agent to use for the requests
user_agent = Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:79.0) Gecko/20100101 Firefox/79.0
# Scrape markers when using 'Scrape with...'
scrape_markers = False
# Minimum similarity ratio to consider a match when searching
minimum_similarity = 0.75
"""
)
def default_postprocess(obj: Any, _) -> Any:
return obj
## Temporary function to add markers to scenes, remove when/if Stash gets native support
def add_markers(scene_id: str, markers: list[dict]):
from itertools import tee, filterfalse
def partition(pred, iterable):
t1, t2 = tee(iterable)
return list(filter(pred, t2)), list(filterfalse(pred, t1))
from py_common.graphql import callGraphQL
def format_time(seconds: int) -> str:
if seconds > 3600:
return f"{seconds // 3600}:{(seconds // 60) % 60:02}:{seconds % 60:02}"
return f"{(seconds // 60) % 60}:{seconds % 60:02}"
raw_tags = callGraphQL("query allTags { allTags { name id aliases } }")
tags = {tag["name"].lower(): tag["id"] for tag in raw_tags["allTags"]}
tags |= {
alias.lower(): tag["id"]
for tag in raw_tags["allTags"]
for alias in tag["aliases"]
}
existing_markers = callGraphQL(
"query FindScene($id: ID!){ findScene(id: $id) { scene_markers { title seconds } } }",
{"id": scene_id},
)["findScene"]["scene_markers"]
valid, invalid = partition(lambda m: m["name"].lower() in tags, markers)
if invalid:
invalid_tags = ", ".join({m["name"] for m in invalid})
log.debug(f"Skipping {len(invalid)} markers, tags do not exist: {invalid_tags}")
log.debug(f"Adding {len(valid)} out of {len(markers)} markers to scene {scene_id}")
create_query = "mutation SceneMarkerCreate($input: SceneMarkerCreateInput!) { sceneMarkerCreate(input: $input) {id}}"
for marker in sorted(valid, key=lambda m: m["seconds"]):
name = marker["name"]
seconds = marker["seconds"]
if any(m["seconds"] == marker["seconds"] for m in existing_markers):
log.debug(
f"Skipping marker '{name}' at {format_time(seconds)} because it already exists"
)
continue
variables = {
"input": {
"title": name,
"primary_tag_id": tags[name.lower()],
"seconds": int(seconds),
"scene_id": scene_id,
"tag_ids": [],
}
}
callGraphQL(create_query, variables)
log.debug(f"Added marker '{name}' at {format_time(seconds)}")
# network stuff
def __raw_request(url, headers) -> requests.Response:
log.trace(f"Sending GET request to {url}")
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 429:
log.error(
"[REQUEST] 429 Too Many Requests: "
"you have sent too many requests in a given amount of time."
)
sys.exit(1)
# Even a 404 will contain an instance token
return response
def __api_request(url: str, headers: dict) -> dict | None:
result = __raw_request(url, headers)
api_response = result.json()
if isinstance(api_response, list):
api_search_errors = "\n- ".join(
json.dumps(res, indent=None) for res in api_response
)
log.error(f"Errors from API:\n{api_search_errors}")
return None
# with open("api_response.json", "w", encoding="utf-8") as f:
# json.dump(api_response, f, indent=2)
return api_response["result"]
def _create_headers_for(domain: str) -> dict[str, str]:
# If we haven't stored a token we must provide a function to get one
def get_instance_token(url: str) -> str | None:
r = __raw_request(url, {"User-Agent": config.user_agent})
if r and (token := r.cookies.get("instance_token")):
return token
log.error(
f"Failed to get instance_token from '{url}': "
"are you sure this site is in the Aylo network?"
)
api_token = domains.get_token_for(domain, fallback=get_instance_token)
if api_token is None:
log.error(f"Unable to get an API token for '{domain}'")
sys.exit(1)
api_headers = {
"Instance": api_token,
"User-Agent": config.user_agent,
"Origin": f"https://{domain}",
"Referer": f"https://{domain}",
}
return api_headers
def _construct_url(api_result: dict) -> str:
"""
Tries to construct a valid public URL for an API result
This will often result in scene links that point to the parent network site,
so we might want to add wrapper scrapers that can add the correct URL as well
For example, a scene from We Live Together will have an URL for realitykings.com
but that scene is also on welivetogether.com and that might be considered more canonical
"""
brand = api_result["brand"]
type_ = api_result["type"]
id_ = api_result["id"]
slug = slugify(api_result["title"])
return f"https://www.{brand}.com/{type_}/{id_}/{slug}"
def _construct_performer_url(api_result: dict, site: str) -> str:
id_ = api_result["id"]
slug = slugify(api_result["name"])
return f"https://www.{site}.com/model/{id_}/{slug}"
## Helper functions for the objects returned from Aylo's API
def get_studio(api_object: dict) -> ScrapedStudio | None:
studio_name = dig(api_object, "collections", 0, "name")
parent_name = dig(api_object, "brandMeta", ("displayName", "name", "shortName"))
if studio_name:
if parent_name.lower() != studio_name.lower():
return {
"name": studio_name,
"parent": {"name": parent_name},
}
return {"name": studio_name}
elif parent_name:
return {"name": parent_name}
log.error(f"No studio for {api_object['type']} with id {api_object['id']}")
return None
# As documented by AdultSun, these tag IDs appear to be neutral but
# are actually gendered so we can map them to their gender-specific counterparts
tags_map = {
107: "White Woman",
112: "Black Woman",
113: "European Woman",
121: "Latina Woman",
125: "Black Hair (Female)",
126: "Blond Hair (Female)",
127: "Brown Hair (Female)",
128: "Red Hair (Female)",
215: "Rimming Him",
274: "Rimming Her",
374: "Black Man",
376: "European Man",
377: "Latino Man",
378: "White Man",
379: "Black Hair (Male)",
380: "Blond Hair (Male)",
381: "Brown Hair (Male)",
383: "Red Hair (Male)",
385: "Shaved Head",
386: "Short Hair (Male)",
}
def to_tag(api_object: dict) -> ScrapedTag:
mapped_tag = tags_map.get(api_object["id"], api_object["name"].strip())
return {"name": mapped_tag}
def to_tags(api_object: dict) -> list[ScrapedTag]:
tags = api_object.get("tags", [])
return [to_tag(x) for x in tags if "name" in x or x.get("id") in tags_map.keys()]
def to_marker(api_object: dict) -> dict:
return {
**to_tag(api_object),
"seconds": api_object["startTime"],
}
state_map = {
"AK": "USA",
"AL": "USA",
"AR": "USA",
"AZ": "USA",
"CA": "USA",
"CO": "USA",
"CT": "USA",
"DC": "USA",
"DE": "USA",
"FL": "USA",
"GA": "USA",
"HI": "USA",
"IA": "USA",
"ID": "USA",
"IL": "USA",
"IN": "USA",
"KS": "USA",
"KY": "USA",
"LA": "USA",
"MA": "USA",
"MD": "USA",
"ME": "USA",
"MI": "USA",
"MN": "USA",
"MO": "USA",
"MS": "USA",
"MT": "USA",
"NC": "USA",
"ND": "USA",
"NE": "USA",
"NH": "USA",
"NJ": "USA",
"NM": "USA",
"NV": "USA",
"NY": "USA",
"OH": "USA",
"OK": "USA",
"OR": "USA",
"PA": "USA",
"RI": "USA",
"SC": "USA",
"SD": "USA",
"TN": "USA",
"TX": "USA",
"UT": "USA",
"VA": "USA",
"VT": "USA",
"WA": "USA",
"WI": "USA",
"WV": "USA",
"WY": "USA",
"Alabama": "USA",
"Alaska": "USA",
"Arizona": "USA",
"Arkansas": "USA",
"California": "USA",
"Colorado": "USA",
"Connecticut": "USA",
"Delaware": "USA",
"Florida": "USA",
"Georgia": "USA",
"Hawaii": "USA",
"Idaho": "USA",
"Illinois": "USA",
"Indiana": "USA",
"Iowa": "USA",
"Kansas": "USA",
"Kentucky": "USA",
"Louisiana": "USA",
"Maine": "USA",
"Maryland": "USA",
"Massachusetts": "USA",
"Michigan": "USA",
"Minnesota": "USA",
"Mississippi": "USA",
"Missouri": "USA",
"Montana": "USA",
"Nebraska": "USA",
"Nevada": "USA",
"New Hampshire": "USA",
"New Jersey": "USA",
"New Mexico": "USA",
"New York": "USA",
"North Carolina": "USA",
"North Dakota": "USA",
"Ohio": "USA",
"Oklahoma": "USA",
"Oregon": "USA",
"Pennsylvania": "USA",
"Rhode Island": "USA",
"South Carolina": "USA",
"South Dakota": "USA",
"Tennessee": "USA",
"Texas": "USA",
"Utah": "USA",
"Vermont": "USA",
"Virginia": "USA",
"Washington": "USA",
"West Virginia": "USA",
"Wisconsin": "USA",
"Wyoming": "USA",
}
## Helper functions to convert from Aylo's API to Stash's scaper return types
def to_scraped_performer(
performer_from_api: dict, site: str | None = None
) -> ScrapedPerformer:
if (type_ := dig(performer_from_api, "brand")) and type_ not in (
"actorsandtags",
# Older sites use this type
"phpactors",
):
wrong_type = performer_from_api.get("type", "mystery")
wrong_id = performer_from_api.get("id", "unknown")
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a scene.")
raise ValueError("Invalid performer from API")
# This is all we get when scraped as part of a scene or movie
performer: ScrapedPerformer = {
"name": performer_from_api["name"],
"gender": performer_from_api["gender"],
}
if aliases := ", ".join(
alias
for alias in performer_from_api.get("aliases", [])
if alias.lower() != performer["name"].lower()
):
performer["aliases"] = aliases
if details := performer_from_api.get("bio"):
performer["details"] = details
# All remaining fields are only available when scraped directly
if height := performer_from_api.get("height"):
# Convert to cm
performer["height"] = str(round(height * 2.54))
if weight := performer_from_api.get("weight"):
# Convert to kg
performer["weight"] = str(round(weight / 2.205))
if birthdate := performer_from_api.get("birthday"):
performer["birthdate"] = datetime.strptime(
birthdate, "%Y-%m-%dT%H:%M:%S%z"
).strftime("%Y-%m-%d")
if birthplace := performer_from_api.get("birthPlace"):
performer["country"] = birthplace
if measurements := performer_from_api.get("measurements"):
performer["measurements"] = measurements
images = dig(performer_from_api, "images", "master_profile") or {}
# Performers can have multiple images, try to get the biggest versions
if images := [
img
for alt in images.values()
if (img := dig(alt, ("xx", "xl", "lg", "md", "sm"), "url"))
]:
performer["images"] = images
if tags := to_tags(performer_from_api):
performer["tags"] = tags
if site:
performer["url"] = _construct_performer_url(performer_from_api, site)
return performer
def to_scraped_movie(movie_from_api: dict) -> ScrapedMovie:
if not movie_from_api["type"] == "movie":
wrong_type = movie_from_api["type"]
wrong_id = movie_from_api["id"]
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a movie.")
raise ValueError("Invalid movie from API")
movie: ScrapedMovie = {
"name": movie_from_api["title"],
"synopsis": dig(movie_from_api, "description"),
"front_image": dig(movie_from_api, "images", "cover", "0", "xx", "url"),
"url": _construct_url(movie_from_api),
}
if date := dig(movie_from_api, "dateReleased"):
movie["date"] = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z").strftime(
"%Y-%m-%d"
)
if studio := get_studio(movie_from_api):
movie["studio"] = studio
return movie
def to_scraped_scene(scene_from_api: dict) -> ScrapedScene:
if not scene_from_api["type"] == "scene":
wrong_type = scene_from_api["type"]
wrong_id = scene_from_api["id"]
log.error(f"Attempted to scrape a '{wrong_type}' (ID: {wrong_id}) as a scene.")
raise ValueError("Invalid scene from API")
scene: ScrapedScene = {
"title": scene_from_api["title"],
"code": str(scene_from_api["id"]),
"details": dig(scene_from_api, "description"),
"date": datetime.strptime(
scene_from_api["dateReleased"], "%Y-%m-%dT%H:%M:%S%z"
).strftime("%Y-%m-%d"),
"url": _construct_url(scene_from_api),
"performers": [
to_scraped_performer(p, dig(scene_from_api, "brand"))
for p in scene_from_api["actors"]
],
"tags": to_tags(scene_from_api),
}
if image := dig(
scene_from_api,
"images",
("poster", "poster_fallback"),
"0",
("xx", "xl", "lg", "md", "sm", "xs"),
"url",
):
scene["image"] = image
if dig(scene_from_api, "parent", "type") == "movie":
scene["movies"] = [to_scraped_movie(scene_from_api["parent"])]
if studio := get_studio(scene_from_api):
scene["studio"] = studio
if markers := scene_from_api.get("timeTags"):
if config.scrape_markers:
scene["markers"] = [to_marker(m) for m in markers]
else:
log.debug(
f"This scene has {len(markers)} markers,"
" you can enable scraping them in config.ini"
)
return scene
## Primary functions used to scrape from Aylo's API
def scene_from_url(
url, postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess
) -> ScrapedScene | None:
"""
Scrapes a scene from a URL, running an optional postprocess function on the result
"""
if not (match := re.search(r"/(\d+)/", url)):
log.error(
"Can't get the ID of the Scene. "
"Are you sure that URL is from a site in the Aylo Network?"
)
return None
scene_id = match.group(1)
log.debug(f"Scene ID: {scene_id}")
# Extract the domain from the URL
domain = domains.site_name(url)
api_URL = f"https://site-api.project1service.com/v2/releases/{scene_id}"
api_headers = _create_headers_for(domain)
api_scene_json = __api_request(api_URL, api_headers)
if not api_scene_json:
return None
# If you scrape a trailer we can still get the correct scene data
if (
dig(api_scene_json, "type") != "scene"
and dig(api_scene_json, "parent", "type") == "scene"
):
log.debug("Result is a trailer, getting scene data from parent")
api_scene_json = api_scene_json["parent"]
return postprocess(to_scraped_scene(api_scene_json), api_scene_json)
def performer_from_url(
url,
postprocess: Callable[
[ScrapedPerformer, dict], ScrapedPerformer
] = default_postprocess,
) -> ScrapedPerformer | None:
"""
Scrapes a performer from a URL, running an optional postprocess function on the result
"""
if not (match := re.search(r"/(\d+)/", url)):
log.error(
"Can't get the ID of the performer. "
"Are you sure that URL is from a site in the Aylo Network?"
)
return None
performer_id = match.group(1)
log.debug(f"Performer ID: {performer_id}")
# Extract the domain from the URL
domain = urlparse(url).netloc.split(".")[-2]
api_URL = f"https://site-api.project1service.com/v1/actors/{performer_id}"
api_headers = _create_headers_for(domain)
api_performer_json = __api_request(api_URL, api_headers)
if not api_performer_json:
return None
return postprocess(to_scraped_performer(api_performer_json), api_performer_json)
def movie_from_url(
url, postprocess: Callable[[ScrapedMovie, dict], ScrapedMovie] = default_postprocess
) -> ScrapedMovie | None:
"""
Scrapes a movie from a URL, running an optional postprocess function on the result
"""
if not (match := re.search(r"/(\d+)/", url)):
log.error(
"Can't get the ID of the movie. "
"Are you sure that URL is from a site in the Aylo Network?"
)
return None
movie_id = match.group(1)
log.debug(f"Movie ID: {movie_id}")
# Extract the domain from the URL
domain = urlparse(url).netloc.split(".")[-2]
api_URL = f"https://site-api.project1service.com/v2/releases/{movie_id}"
api_headers = _create_headers_for(domain)
api_movie_json = __api_request(api_URL, api_headers)
if not api_movie_json:
return None
if dig(api_movie_json, "type") == "movie":
return postprocess(to_scraped_movie(api_movie_json), api_movie_json)
# If you scrape a scene or trailer, we can still get the correct movie data
if dig(api_movie_json, "parent", "type") == "movie":
log.debug("Result is a scene or trailer, getting movie data from parent")
return postprocess(
to_scraped_movie(api_movie_json["parent"]), api_movie_json["parent"]
)
# Since the "Scrape with..." function in Stash expects a single result, we provide
# this function to return the first result that exceeds the threshold so
# that users don't need to use scene_search directly and THEN take the first result
def find_scene(
query: str,
search_domains: list[str] | None = None,
min_ratio: float = 0.9,
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
) -> ScrapedScene | None:
"""
Searches the Aylo API for scenes matching the given query and returns the
first match that exceeds `min_ratio` similarity: a float between 0 and 1.
Differs from `scene_from_query` in that it only returns the first match,
returning early as soon as it finds a match that exceeds the threshold.
If search_domains is provided it will only search those domains,
otherwise it will search all (this could be very slow!)
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
If postprocess is provided it will be called on the result before returning
"""
if not query:
log.error("No query provided")
return None
if not search_domains:
log.warning("Searcing all known domains, this could be very slow!")
search_domains = domains.all_domains()
log.debug(f"Matching '{query}' against {len(search_domains)} sites")
def matcher(candidate_title: str):
return round(
difflib.SequenceMatcher(
None, query.lower(), candidate_title.lower()
).ratio(),
3,
)
for domain in search_domains:
log.debug(f"Searching '{domain}'")
api_headers = _create_headers_for(domain)
search_url = f"https://site-api.project1service.com/v2/releases?search={query}&type=scene"
api_response = __api_request(search_url, api_headers)
if api_response is None:
log.error(f"Failed to search '{domain}'")
continue
if not api_response:
log.debug(f"No results from '{domain}'")
continue
best_match = max(api_response, key=lambda x: matcher(x["title"]))
ratio = matcher(best_match["title"])
if ratio >= min_ratio:
log.info(
f"Found scene '{best_match['title']}' with {ratio:.2%} similarity "
f"to '{query}' (exceeds {min_ratio:.2%} threshold) "
f"on '{domain}'"
)
return postprocess(to_scraped_scene(best_match), best_match)
else:
log.info(
f"Giving up on '{domain}': best result '{best_match['title']}' "
f"with {ratio:.2%} similarity"
)
log.error(f"No scenes found for '{query}'")
return None
# Since the "Scrape with..." function in Stash expects a single result, we provide
# this function to return the first result that exceeds the threshold so
# that users don't need to use performer_search directly and THEN take the first result
def find_performer(
query: str,
search_domains: list[str] | None = None,
min_ratio: float = 0.9,
postprocess: Callable[
[ScrapedPerformer, dict], ScrapedPerformer
] = default_postprocess,
) -> ScrapedPerformer | None:
"""
Searches the Aylo API for performers matching the given query and returns the
first match that exceeds `min_ratio` similarity: a float between 0 and 1.
Differs from `search_performer` in that it only returns the first match,
returning early as soon as it finds a match that exceeds the threshold.
If search_domains is provided it will only search those domains,
otherwise it will search all (this could be very slow!)
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
If postprocess is provided it will be called on the result before returning
"""
if not query:
log.error("No query provided")
return None
if not search_domains:
log.warning("Searcing all known domains, this could be very slow!")
search_domains = domains.all_domains()
log.debug(f"Matching '{query}' against {len(search_domains)} sites")
def matcher(candidate_name: str):
return round(
difflib.SequenceMatcher(
None, query.lower(), candidate_name.lower()
).ratio(),
3,
)
for domain in search_domains:
log.debug(f"Searching {domain}")
api_headers = _create_headers_for(domain)
search_url = f"https://site-api.project1service.com/v1/actors?search={query}"
api_response = __api_request(search_url, api_headers)
if api_response is None:
log.error(f"Failed to search {domain}")
continue
if not api_response:
log.debug(f"No results from {domain}")
continue
best_match = max(api_response, key=lambda x: matcher(x["name"]))
ratio = matcher(best_match["name"])
if ratio >= min_ratio:
log.info(
f"Found performer '{best_match['name']}' with {ratio:.2%} similarity "
f"to '{query}' (exceeds {min_ratio:.2%} threshold) "
f"on '{domain}'"
)
return postprocess(to_scraped_performer(best_match, domain), best_match)
else:
log.info(
f"Giving up on '{domain}': best result '{best_match['name']}' "
f"with {ratio:.2%} similarity"
)
log.error(f"No performers found for '{query}'")
return None
def scene_search(
query: str,
search_domains: list[str] | None = None,
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
) -> list[ScrapedScene]:
"""
Searches the Aylo API for the given query and returns a list of ScrapedScene
If search_domains is provided it will only search those domains,
otherwise it will search all known domains (this could be very slow!)
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
If postprocess is provided it will be called on each result before returning
"""
if not query:
log.error("No query provided")
return []
if not search_domains:
log.warning("Searcing all known domains, this could be very slow!")
search_domains = domains.all_domains()
log.debug(f"Searching for '{query}' on {len(search_domains)} sites")
# The source of the results will be based on the token used (Brazzers, Reality Kings, etc.)
search_url = f"https://site-api.project1service.com/v2/releases?search={query}&type=scene&limit=10"
search_results = []
already_seen = set()
def matcher(candidate: ScrapedScene):
return round(
difflib.SequenceMatcher(
None,
query.lower(),
candidate["title"].lower(), # type: ignore (title is always set)
).ratio(),
3,
)
for domain in search_domains:
log.debug(f"Searching {domain}")
api_headers = _create_headers_for(domain)
api_response = __api_request(search_url, api_headers)
if api_response is None:
log.error(f"Failed to search {domain}")
continue
if not api_response:
log.debug(f"No results from {domain}")
continue
candidates = [
postprocess(to_scraped_scene(result), result)
for result in api_response
if result["id"] not in already_seen
]
search_results.extend(
c
for c in candidates
if matcher(c) > 0.5 and c.get("code") not in already_seen
)
already_seen.update(c.get("code") for c in candidates)
# Try to to avoid more than 10ish results or this will take forever
if len(search_results) >= 10:
log.warning("Found more than 10 results, stopping search")
break
log.info(f"Search finished, found {len(search_results)} candidates")
return sorted(search_results, key=matcher, reverse=True)
def performer_search(
query: str,
search_domains: list[str] | None = None,
postprocess: Callable[
[ScrapedPerformer, dict], ScrapedPerformer
] = default_postprocess,
) -> list[ScrapedPerformer]:
"""
Searches the Aylo API for the given query and returns a list of ScrapedPerformer
If search_domains is provided it will only search those domains,
otherwise it will search all known domains (this could be very slow!)
Domains should not include the "www." or ".com" parts of the domain: 'brazzers', 'realitykings', etc.
If postprocess is provided it will be called on each result before returning
"""
if not query:
log.error("No query provided")
return []
if not search_domains:
log.warning("Searcing all known domains, this could be very slow!")
search_domains = domains.all_domains()
log.debug(f"Searching for '{query}' on {len(search_domains)} sites")
# The source of the results will be based on the token used (Brazzers, Reality Kings, etc.)
search_url = (
f"https://site-api.project1service.com/v1/actors?search={query}&limit=10"
)
search_results = []
already_seen = set()
def matcher(candidate: ScrapedPerformer):
return round(
difflib.SequenceMatcher(
None,
query.lower(),
candidate["name"].lower(), # type: ignore (name is always set)
).ratio(),
3,
)
for domain in search_domains:
log.debug(f"Searching {domain}")
api_headers = _create_headers_for(domain)
api_response = __api_request(search_url, api_headers)
if api_response is None:
log.error(f"Failed to search {domain}")
continue
if not api_response:
log.debug(f"No results from {domain}")
continue
candidates = [
postprocess(to_scraped_performer(result, domain), result)
for result in api_response
]
search_results.extend(
c
for c in candidates
if matcher(c) > 0.5 and c.get("name") not in already_seen
)
already_seen.update(c.get("name") for c in candidates)
# Try to to avoid more than 10ish results or this will take forever
if len(search_results) >= 10:
log.warning("Found more than 10 results, stopping search")
break
log.debug(f"Search finished, found {len(search_results)} candidates")
return sorted(search_results, key=matcher, reverse=True)
def scene_from_fragment(
fragment: dict,
search_domains: list[str] | None = None,
min_ratio=config.minimum_similarity,
postprocess: Callable[[ScrapedScene, dict], ScrapedScene] = default_postprocess,
) -> ScrapedScene | None:
"""
Scrapes a scene from a fragment, which must contain at least one of the following:
- url: the URL of the scene
- title: the title of the scene
If domains is provided it will only search those domains,
otherwise it will search all known domains (this could be very slow!)
If min_ratio is provided _AND_ the fragment contains a title but no URL,
the search will only return a scene if a match with at least that ratio is found
If postprocess is provided it will be called on the result before returning
"""
log.debug(f"Fragment scraping scene {fragment['id']}")
if url := fragment.get("url"):
log.debug(f"Using scene URL: '{url}'")
if scene := scene_from_url(url, postprocess=postprocess):
if (
fragment["id"]
and config.scrape_markers
and (markers := scene.pop("markers", []))
):
add_markers(fragment["id"], markers)
return scene
log.debug("Failed to scrape scene from URL")
if title := fragment.get("title"):
log.debug(f"Searching for '{title}'")
if scene := find_scene(
title, search_domains, min_ratio, postprocess=postprocess
):
return scene
log.debug("Failed to find scene by title")
log.warning("Cannot scrape from this fragment: need to have title or url set")
def performer_from_fragment(
fragment: dict,
search_domains: list[str] | None = None,
min_ratio=0.9,
postprocess: Callable[
[ScrapedPerformer, dict], ScrapedPerformer
] = default_postprocess,
) -> ScrapedPerformer | None:
"""
Scrapes a performer from a fragment, which must contain one of the following:
- url: the URL of the performer page (anywhere in the Aylo network)
- name: the name of the performer
If domains is provided it will only search those domains,
otherwise it will search all known domains (this could be very slow!)
If min_ratio is provided _AND_ the fragment contains a title but no URL,
the search will only return a scene if a match with at least that ratio is found
If postprocess is provided it will be called on the result before returning
"""
log.debug("Fragment scraping performer...")
if url := fragment.get("url"):
log.debug(f"Using performer URL: '{url}'")
return performer_from_url(url, postprocess=postprocess)
elif name := fragment.get("name"):
log.debug(f"Searching for '{name}'")
return find_performer(name, search_domains, min_ratio, postprocess=postprocess)
log.warning("Cannot scrape from this fragment: need to have url or name set")
def main_scraper():
"""
Takes arguments from stdin or from the command line and dumps output as JSON to stdout
"""
op, args = scraper_args()
result = None
match op, args:
case "scene-by-url", {"url": url} if url:
result = scene_from_url(url)
case "scene-by-name", {"name": name, "extra": _domains} if name:
result = scene_search(name, search_domains=_domains)
case "scene-by-fragment" | "scene-by-query-fragment", args:
_domains = args.get("extra", None)
result = scene_from_fragment(args, search_domains=_domains)
case "performer-by-url", {"url": url}:
result = performer_from_url(url)
case "performer-by-fragment", args:
_domains = args.get("extra", None)
result = performer_from_fragment(args, search_domains=_domains)
case "performer-by-name", {"name": name, "extra": _domains} if name:
result = performer_search(name, search_domains=_domains)
case "movie-by-url", {"url": url} if url:
result = movie_from_url(url)
case _:
log.error(f"Operation: {op}, arguments: {json.dumps(args)}")
sys.exit(1)
print(json.dumps(result))
if __name__ == "__main__":
main_scraper()