Skip to content

Commit

Permalink
Userdata-only parser for Touhou 9.5 (Shoot the Bullet). (#506)
Browse files Browse the repository at this point in the history
* Userdata-only parser for Touhou 9.5 (Shoot the Bullet).

* Fix spell card ID

* Cleanly reject games that are parsable but not really supported yet.

* Get rid of the gimmicky fake spell card ID and add real fields instead.

Not sure exactly what the right way to do this is, but this should be fine for now.

* Fix linter error
  • Loading branch information
n-rook authored Nov 2, 2024
1 parent c821c80 commit 42b5eb1
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 7 deletions.
40 changes: 34 additions & 6 deletions project/thscoreboard/replays/constant_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from replays import models


def _Game():
return apps.apps.get_model("replays", "Game")


def _Route():
return apps.apps.get_model("replays", "Route")

Expand All @@ -20,6 +24,14 @@ def _Shot():
return apps.apps.get_model("replays", "Shot")


class UnknownGameError(Exception):
"""An ID for a constant row did not match any row in the database."""

def __init__(self, game_id: str):
super().__init__(self, game_id)
self.id = game_id


@dataclasses.dataclass(frozen=True)
class ReplayConstantModels:
game: models.Game
Expand All @@ -30,12 +42,28 @@ class ReplayConstantModels:
def GetModelInstancesForReplay(
replay_info: replay_parsing.ReplayInfo,
) -> models.ReplayConstantModels:
"""Get the constant model instances related to this replay."""
shot = (
_Shot()
.objects.select_related("game")
.get(game=replay_info.game, shot_id=replay_info.shot)
)
"""Get the constant model instances related to this replay.
Raises:
UnknownGameError: If the "game" field of the replay does not correspond
to a row in the database.
"""
try:
shot = (
_Shot()
.objects.select_related("game")
.get(game=replay_info.game, shot_id=replay_info.shot)
)
except _Shot().DoesNotExist:
# If the game also does not exist, we can raise an appropriate exception.
# If the game exists but the shot does not, either we have a bug or the
# replay is extremely bizarre.
try:
_ = _Game().objects.get(game_id=replay_info.game)
except _Game().DoesNotExist:
raise UnknownGameError(replay_info.game)
raise

if replay_info.route:
route = _Route().objects.get(game=shot.game, route_id=replay_info.route)
else:
Expand Down
1 change: 1 addition & 0 deletions project/thscoreboard/replays/game_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class GameIDs:
TH07 = "th07"
TH08 = "th08"
TH09 = "th09"
TH095 = "th095"
TH10 = "th10"
TH11 = "th11"
TH12 = "th12"
Expand Down
103 changes: 103 additions & 0 deletions project/thscoreboard/replays/kaitai_parsers/th095_encrypted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO


if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))

class Th095Encrypted(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.header = Th095Encrypted.FileHeader(self._io, self, self._root)

class FileHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.magic = self._io.read_bytes(4)
if not self.magic == b"\x74\x39\x35\x72":
raise kaitaistruct.ValidationNotEqualError(b"\x74\x39\x35\x72", self.magic, self._io, u"/types/file_header/seq/0")
self.unknown_1 = self._io.read_bytes(8)
self.userdata_offset = self._io.read_u4le()


class Userdata(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.magic_user = self._io.read_bytes(4)
if not self.magic_user == b"\x55\x53\x45\x52":
raise kaitaistruct.ValidationNotEqualError(b"\x55\x53\x45\x52", self.magic_user, self._io, u"/types/userdata/seq/0")
self.user_length = self._io.read_u4le()
self.unknown = self._io.read_bytes(4)
self.user_desc = []
i = 0
while True:
_ = self._io.read_u1()
self.user_desc.append(_)
if _ == 13:
break
i += 1
self.user_desc_term = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII")
self.version = Th095Encrypted.UserdataField(u"Version", self._io, self, self._root)
self.username = Th095Encrypted.UserdataField(u"Name", self._io, self, self._root)
self.level = Th095Encrypted.UserdataField(u"Level", self._io, self, self._root)
self.scene = Th095Encrypted.UserdataField(u"Scene", self._io, self, self._root)
self.date = Th095Encrypted.UserdataField(u"Date", self._io, self, self._root)
self.score = Th095Encrypted.UserdataField(u"Score", self._io, self, self._root)
self.slowdown = Th095Encrypted.UserdataField(u"Slow Rate", self._io, self, self._root)


class UserdataField(KaitaiStruct):
def __init__(self, expected_name, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self.expected_name = expected_name
self._read()

def _read(self):
self.name = (self._io.read_bytes(len(self.expected_name))).decode(u"ASCII")
if not self.name == self.expected_name:
raise kaitaistruct.ValidationNotEqualError(self.expected_name, self.name, self._io, u"/types/userdata_field/seq/0")
self.name_value_separator_space = self._io.read_bytes(1)
if not self.name_value_separator_space == b"\x20":
raise kaitaistruct.ValidationNotEqualError(b"\x20", self.name_value_separator_space, self._io, u"/types/userdata_field/seq/1")
self.value_with_space = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII")

@property
def value(self):
if hasattr(self, '_m_value'):
return self._m_value

self._m_value = (self.value_with_space)[0:(len(self.value_with_space) - 1)]
return getattr(self, '_m_value', None)


@property
def userdata(self):
if hasattr(self, '_m_userdata'):
return self._m_userdata

_pos = self._io.pos()
self._io.seek(self.header.userdata_offset)
self._m_userdata = Th095Encrypted.Userdata(self._io, self, self._root)
self._io.seek(_pos)
return getattr(self, '_m_userdata', None)


27 changes: 27 additions & 0 deletions project/thscoreboard/replays/replay_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .kaitai_parsers import th07
from .kaitai_parsers import th08
from .kaitai_parsers import th09
from .kaitai_parsers import th095_encrypted
from .kaitai_parsers import th10
from .kaitai_parsers import th11
from .kaitai_parsers import th12
Expand Down Expand Up @@ -90,7 +91,11 @@ class ReplayInfo:
name: str
replay_type: int
route: Optional[str] = None

spell_card_id: Optional[int] = None
scene_game_level: Optional[int] = None
scene_game_scene: Optional[int] = None

stages: List[ReplayStage] = dataclasses.field(default_factory=list)
slowdown: Optional[float] = None

Expand Down Expand Up @@ -446,6 +451,26 @@ def _Parse09(rep_raw):
return r


def _Parse095(rep_raw):
encrypted_replay = th095_encrypted.Th095Encrypted.from_bytes(rep_raw)

spell_level = int(encrypted_replay.userdata.level.value)
spell_scene = int(encrypted_replay.userdata.scene.value)

return ReplayInfo(
game=game_ids.GameIDs.TH095,
shot="Aya",
difficulty=0,
score=int(encrypted_replay.userdata.score.value),
timestamp=time.strptime(encrypted_replay.userdata.date.value, "%y/%m/%d %H:%M"),
name=encrypted_replay.userdata.username.value,
replay_type=game_ids.ReplayTypes.SPELL_PRACTICE,
scene_game_level=spell_level,
scene_game_scene=spell_scene,
slowdown=float(encrypted_replay.userdata.slowdown.value),
)


def _Parse10(rep_raw):
header = th_modern.ThModern.from_bytes(rep_raw)
comp_data = bytearray(header.main.comp_data)
Expand Down Expand Up @@ -1104,6 +1129,8 @@ def Parse(replay) -> ReplayInfo:
return _Parse08(replay)
elif gamecode == b"T9RP":
return _Parse09(replay)
elif gamecode == b"t95r":
return _Parse095(replay)
elif gamecode == b"t10r":
return _Parse10(replay)
elif gamecode == b"t11r":
Expand Down
Binary file not shown.
Binary file not shown.
10 changes: 10 additions & 0 deletions project/thscoreboard/replays/test_constant_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ def testGetWithRoute(self):
self.assertEqual(constants.shot.shot_id, "Yukari")
self.assertIsNotNone(constants.route)
self.assertEqual(constants.route.route_id, "Final B")

def testGetUnknownGame(self):
replay_file_contents = test_replays.GetRaw("th10_normal")
replay_info = replay_parsing.Parse(replay_file_contents)
replay_info.game = "th5000"

with self.assertRaises(constant_helpers.UnknownGameError) as ctx:
constant_helpers.GetModelInstancesForReplay(replay_info)

self.assertEqual(ctx.exception.id, "th5000")
28 changes: 28 additions & 0 deletions project/thscoreboard/replays/test_replay_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,34 @@ def testPVP(self):
self.assertEqual(s.th09_p2_score, 0)


class Th095ReplayTestCase(unittest.TestCase):
def testOne(self):
r = ParseTestReplay("th95_3-1")
self.assertEqual(r.game, "th095")
self.assertEqual(r.score, 132030)
self.assertEqual(
r.timestamp,
datetime.datetime(2024, 4, 27, 16, 42, tzinfo=datetime.timezone.utc),
)
self.assertEqual(r.scene_game_level, 3)
self.assertEqual(r.scene_game_scene, 1)
self.assertEqual(r.name, "nrook3.1")
self.assertEqual(r.slowdown, 0.00)

def testTwo(self):
r = ParseTestReplay("th95_2-5")
self.assertEqual(r.game, "th095")
self.assertEqual(r.score, 128830)
self.assertEqual(
r.timestamp,
datetime.datetime(2024, 4, 27, 19, 54, tzinfo=datetime.timezone.utc),
)
self.assertEqual(r.scene_game_level, 2)
self.assertEqual(r.scene_game_scene, 5)
self.assertEqual(r.name, "nrook ")
self.assertEqual(r.slowdown, 0.00)


class Th10ReplayTestCase(unittest.TestCase):
def testNormal(self):
r = ParseTestReplay("th10_normal")
Expand Down
14 changes: 13 additions & 1 deletion project/thscoreboard/replays/views/create_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.http import Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext as _
from django.views.decorators import http as http_decorators
from django.contrib.auth import decorators as auth_decorators
from django.core.exceptions import ValidationError
Expand Down Expand Up @@ -41,10 +42,21 @@ def _HandleReplay(request, replay_bytes):
)

try:
return replay_parsing.Parse(replay_bytes)
replay_info = replay_parsing.Parse(replay_bytes)
except replay_parsing.Error as e:
raise ValidationError(str(e))

try:
constant_helpers.GetModelInstancesForReplay(replay_info)
except constant_helpers.UnknownGameError:
# It would be nice to actually specify the game here, but adding formatting parameters
# runs into issues with Django rendering.
raise ValidationError(
message=_("This game is not yet supported."),
)

return replay_info


@auth_decorators.login_required
@http_decorators.require_http_methods(["GET", "HEAD", "POST"])
Expand Down
71 changes: 71 additions & 0 deletions ref/threp-ksy/th095_encrypted.ksy
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
meta:
id: th095_encrypted
file-extension: rpy
endian: le
seq:
- id: header
type: file_header
instances:
userdata:
pos: header.userdata_offset
type: userdata
types:
file_header:
seq:
- id: magic
contents: t95r
# Probably includes version
- id: unknown_1
size: 8
- id: userdata_offset
type: u4
userdata:
seq:
- id: magic_user
contents: USER
- id: user_length
type: u4
- id: unknown
size: 4
- id: user_desc
type: u1
repeat: until
repeat-until: _ == 0xd
- id: user_desc_term
type: str
terminator: 0xa
encoding: ASCII
- id: version
type: userdata_field("Version")
- id: username
type: userdata_field("Name")
- id: level
type: userdata_field("Level")
- id: scene
type: userdata_field("Scene")
- id: date
type: userdata_field("Date")
- id: score
type: userdata_field("Score")
- id: slowdown
type: userdata_field("Slow Rate")
userdata_field:
params:
- id: expected_name
type: str
seq:
- id: name
type: str
size: expected_name.length
encoding: ASCII
valid: expected_name
- id: name_value_separator_space
contents: " "
- id: value_with_space
type: str
# Always ends with 0x0d0a; that is, space then LF
terminator: 0x0a
encoding: ASCII
instances:
value:
value: value_with_space.substring(0, value_with_space.length - 1)

0 comments on commit 42b5eb1

Please sign in to comment.