Skip to content

Commit

Permalink
[feat][command] implements ls-files
Browse files Browse the repository at this point in the history
  • Loading branch information
smd1121 committed Jan 4, 2024
1 parent b8ea839 commit 3a9a62a
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 4 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ jobs:
- name: Run tests
run: |
cd xgit
coverage run -m pytest .
coverage run -m pytest xgit
coverage report -m
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ __pycache__
.mypy_cache
*.egg-info/
.pytest_cache/
.coverage
3 changes: 2 additions & 1 deletion xgit/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import typer

from xgit.commands import init, cat_file, hash_object
from xgit.commands import init, cat_file, ls_files, hash_object

app = typer.Typer(add_completion=False, rich_markup_mode="markdown")


app.command()(hash_object.hash_object)
app.command()(init.init)
app.command()(cat_file.cat_file)
app.command()(ls_files.ls_files)


def main():
Expand Down
23 changes: 23 additions & 0 deletions xgit/commands/ls_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path

import typer
from typer import Option
from typing_extensions import Annotated

from xgit.types.index import get_index
from xgit.utils.utils import find_repo


def ls_files(
full_name: Annotated[bool, Option("--full-name", help="输出相对于项目根目录,而非当前目录")] = False,
):
index = get_index()
for entry in index.entries:
f = find_repo() / entry.file_name
cwd = Path.cwd()

if f.is_relative_to(cwd):
if full_name:
typer.echo(entry.file_name)
else:
typer.echo(f.relative_to(cwd))
19 changes: 19 additions & 0 deletions xgit/test/test_ls_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os

from typer.testing import CliRunner

from xgit.test.test_utils import check_same_output

runner = CliRunner()


def test_ls_files():
assert check_same_output(["ls-files"])

cwd = os.getcwd()
try:
os.chdir("xgit")
assert check_same_output(["ls-files", "--full-name"])
assert check_same_output(["ls-files"])
finally:
os.chdir(cwd)
3 changes: 2 additions & 1 deletion xgit/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def check_same_output(cmd: list[str]) -> bool:

if git_result.stdout != xgit_stdout:
logger.info(f"cmd: {cmd}")
logger.info(f"stdout not equal: {git_result.stdout} != {xgit_stdout}")
logger.info(f"stdout not equal: {git_result.stdout!r}\n!=\n{xgit_stdout!r}")
logger.info(f"stdout not equal: {git_result.stdout.decode()}\n!=\n{xgit_stdout.decode()}")
return False

return True
202 changes: 202 additions & 0 deletions xgit/types/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import hashlib
from typing import Optional

from xgit.utils.utils import find_repo
from xgit.utils.constants import GIT_DIR


def print_bytes(data, group_size=4, group_each_line=6):
def is_printable(byte):
return 32 <= byte <= 126

byte_each_line = group_size * group_each_line

n = len(data)
for i in range(0, n, byte_each_line):
line = data[i : i + byte_each_line]
for j in range(0, len(line), group_size):
group = line[j : j + group_size]
print("0x" + "".join(f"{byte:02x}" for byte in group).upper(), end=" ")
print()

for j in range(0, len(line), group_size):
group = line[j : j + group_size]
print(" ", end="")
print(" ".join(chr(byte) if is_printable(byte) else "*" for byte in group), end=" ")
print("\n")


class IndexEntry:
class Flag:
assume_valid: bool
extended: bool
stage: int
name_length: int

def __init__(self, assume_valid, extended, stage, name_length):
self.assume_valid = assume_valid
self.extended = extended
self.stage = stage
self.name_length = name_length

@staticmethod
def from_bytes(data: bytes):
assert len(data) == 2
flag = int.from_bytes(data, "big")
assume_valid = flag & 0x8000 != 0
extended = flag & 0x4000 != 0
stage = (flag & 0x3000) >> 12
name_length = flag & 0x0FFF
return IndexEntry.Flag(assume_valid, extended, stage, name_length)

def to_bytes(self) -> bytes:
data = 0
if self.assume_valid:
data |= 0x8000
if self.extended:
data |= 0x4000
data |= (self.stage << 12) & 0x3000
data |= self.name_length
return data.to_bytes(2, "big")

ctime_s: int
ctime_ns: int
mtime_s: int
mtime_ns: int
dev: int
inode: int
mode: int
uid: int
gid: int
file_size: int
sha: str
flags: Flag
file_name: str

def __init__(
self, ctime_s, ctime_ns, mtime_s, mtime_ns, dev, inode, mode, uid, gid, file_size, sha, flags, file_name
):
self.ctime_s = ctime_s
self.ctime_ns = ctime_ns
self.mtime_s = mtime_s
self.mtime_ns = mtime_ns
self.dev = dev
self.inode = inode
self.mode = mode
self.uid = uid
self.gid = gid
self.file_size = file_size
self.sha = sha
self.flags = flags
self.file_name = file_name

@staticmethod
def parse(data: bytes) -> tuple["IndexEntry", bytes]:
# print_bytes(data)

ctime_s = int.from_bytes(data[:4], "big")
ctime_ns = int.from_bytes(data[4:8], "big")
mtime_s = int.from_bytes(data[8:12], "big")
mtime_ns = int.from_bytes(data[12:16], "big")
dev = int.from_bytes(data[16:20], "big")
inode = int.from_bytes(data[20:24], "big")
mode = int.from_bytes(data[24:28], "big")
uid = int.from_bytes(data[28:32], "big")
gid = int.from_bytes(data[32:36], "big")
file_size = int.from_bytes(data[36:40], "big")
sha = data[40:60].hex()
flags = IndexEntry.Flag.from_bytes(data[60:62])

if flags.name_length < 0xFFF:
file_name = data[62 : 62 + flags.name_length]
assert data[62 + flags.name_length] == 0
len = 62 + flags.name_length + 1
else:
# if name_length >= 0xFFF, then find `\x00` to get the file name
file_name, _ = data[62:].split(b"\x00", maxsplit=1)
len = 62 + len(file_name) + 1

len = (len + 7) // 8 * 8
rest = data[len:] # remove padding

return (
IndexEntry(
ctime_s,
ctime_ns,
mtime_s,
mtime_ns,
dev,
inode,
mode,
uid,
gid,
file_size,
sha,
flags,
file_name.decode(),
),
rest,
)

def to_bytes(self) -> bytes:
entry = self.ctime_s.to_bytes(4, "big")
entry += self.ctime_ns.to_bytes(4, "big")
entry += self.mtime_s.to_bytes(4, "big")
entry += self.mtime_ns.to_bytes(4, "big")
entry += self.dev.to_bytes(4, "big")
entry += self.inode.to_bytes(4, "big")
entry += self.mode.to_bytes(4, "big")
entry += self.uid.to_bytes(4, "big")
entry += self.gid.to_bytes(4, "big")
entry += self.file_size.to_bytes(4, "big")
entry += bytes.fromhex(self.sha)
entry += self.flags.to_bytes()
entry += self.file_name.encode()
entry += b"\x00"

# padding to 8 bytes
padding = (8 - len(entry) % 8) % 8
entry += b"\x00" * padding

return entry


class Index:
version: int
entry_count: int
entries: list[IndexEntry]
extensions: bytes

def __init__(self, data: Optional[bytes] = None):
if data is None:
self.version = 2
self.entry_count = 0
self.entries = []
else:
self.version = int.from_bytes(data[4:8], "big")
self.entry_count = int.from_bytes(data[8:12], "big")
self.entries = []
data = data[12:]
for _ in range(self.entry_count):
entry, data = IndexEntry.parse(data)
self.entries.append(entry)
self.extensions = data[:-20]

def to_bytes(self) -> bytes:
index = b"DIRC"
index += self.version.to_bytes(4, "big")
index += self.entry_count.to_bytes(4, "big")
index += b"".join(entry.to_bytes() for entry in self.entries)
index += self.extensions
index += hashlib.sha1(index).digest()
return index


def get_index() -> Index:
index_path = find_repo() / GIT_DIR / "index"
if not index_path.exists():
return Index()
with index_path.open("rb") as f:
data = f.read()
assert data[-20:] == hashlib.sha1(data[:-20]).digest()
return Index(data)

0 comments on commit 3a9a62a

Please sign in to comment.