From 6070a0f067b369c4c507cfb71ddf9d6d75924a3c Mon Sep 17 00:00:00 2001 From: Andreas Tsouloupas Date: Tue, 23 Apr 2024 19:49:02 +0200 Subject: [PATCH] add challenge andromeda-cloude-storage v2 (crypto) --- .../andromeda-cloud-storage-v2/challenge.yml | 34 ++ .../docker-compose.yml | 13 + .../public/server.py | 1 + .../setup/Dockerfile | 21 + .../setup/requirements.txt | 2 + .../setup/secret.py | 1 + .../setup/server.py | 388 ++++++++++++++++++ .../solution/solve.py | 203 +++++++++ 8 files changed, 663 insertions(+) create mode 100644 crypto/andromeda-cloud-storage-v2/challenge.yml create mode 100644 crypto/andromeda-cloud-storage-v2/docker-compose.yml create mode 120000 crypto/andromeda-cloud-storage-v2/public/server.py create mode 100644 crypto/andromeda-cloud-storage-v2/setup/Dockerfile create mode 100644 crypto/andromeda-cloud-storage-v2/setup/requirements.txt create mode 100644 crypto/andromeda-cloud-storage-v2/setup/secret.py create mode 100644 crypto/andromeda-cloud-storage-v2/setup/server.py create mode 100644 crypto/andromeda-cloud-storage-v2/solution/solve.py diff --git a/crypto/andromeda-cloud-storage-v2/challenge.yml b/crypto/andromeda-cloud-storage-v2/challenge.yml new file mode 100644 index 0000000..ae713e0 --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/challenge.yml @@ -0,0 +1,34 @@ +name: "Andromeda Cloud Storage v2" +author: "feltf" +category: crypto + +description: | + The Andromeda Cloud Storage v2.0.0 is a highly secure encrypted storage that + provides assurance to its users about their data. Go ahead and register your + account to store your data securely in an intergalactic way. After + registration, you will "soon" receive your personal AES and MAC keys that we + randomly generate so that you can read the sweet secret flag. + + New feature: Now we support the full range of bytes!!! + +value: 500 +type: dynamic_docker +extra: + initial: 500 + minimum: 100 + decay: 50 + redirect_type: direct + compose_stack: !filecontents docker-compose.yml + +flags: + - CCSC{p4dd1ng_oracl3_att4ck_due_to_flawed_implem3ntati0n_of_decryption!9f03268fc9363db00c7a43} + +tags: + - crypto + - medium-hard + +files: + - "public/server.py" + +state: hidden +version: "0.1" \ No newline at end of file diff --git a/crypto/andromeda-cloud-storage-v2/docker-compose.yml b/crypto/andromeda-cloud-storage-v2/docker-compose.yml new file mode 100644 index 0000000..0bbf62f --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3.7" + +services: + challenge: + image: ghcr.io/cybermouflons/ccsc2024/andromeda-cloud-storage-v2:latest + restart: always + ports: + - 1337:1337 + build: + context: ./setup + dockerfile: Dockerfile + labels: + ctf.challenge.name: andromeda-cloud-storage-v2 \ No newline at end of file diff --git a/crypto/andromeda-cloud-storage-v2/public/server.py b/crypto/andromeda-cloud-storage-v2/public/server.py new file mode 120000 index 0000000..1384cfe --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/public/server.py @@ -0,0 +1 @@ +../setup/server.py \ No newline at end of file diff --git a/crypto/andromeda-cloud-storage-v2/setup/Dockerfile b/crypto/andromeda-cloud-storage-v2/setup/Dockerfile new file mode 100644 index 0000000..70c936f --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/setup/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.10.2-alpine + +RUN apk add --no-cache gcc musl-dev gmp-dev + +RUN addgroup -S ctf && adduser -S ctf -G ctf +RUN mkdir /app +WORKDIR /app + +COPY requirements.txt /app/ +RUN pip install -r requirements.txt + +COPY secret.py/ /app/ +COPY server.py/ /app/ + +RUN chown ctf -R /app +RUN chmod a+x /app/server.py + +EXPOSE 1337 + +USER ctf +CMD ["/app/server.py"] diff --git a/crypto/andromeda-cloud-storage-v2/setup/requirements.txt b/crypto/andromeda-cloud-storage-v2/setup/requirements.txt new file mode 100644 index 0000000..9228e6a --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/setup/requirements.txt @@ -0,0 +1,2 @@ +pycryptodome==3.20.0 +pytz==2024.1 diff --git a/crypto/andromeda-cloud-storage-v2/setup/secret.py b/crypto/andromeda-cloud-storage-v2/setup/secret.py new file mode 100644 index 0000000..bbbc71a --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/setup/secret.py @@ -0,0 +1 @@ +FLAG = b"\x00"*16 + b"CCSC{p4dd1ng_oracl3_att4ck_due_to_flawed_implem3ntati0n_of_decryption!9f03268fc9363db00c7a43}" + b"\x00\xff" diff --git a/crypto/andromeda-cloud-storage-v2/setup/server.py b/crypto/andromeda-cloud-storage-v2/setup/server.py new file mode 100644 index 0000000..c0c5e02 --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/setup/server.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3 +import socketserver +import json +import sys +import secrets + +from datetime import datetime +from pytz import timezone +tz = datetime.now(timezone("EET")) +tZ = datetime.now(timezone("CET")) + +from base64 import b64encode + +from Crypto.Hash import HMAC, SHA256 +from Crypto.Util.Padding import pad, unpad +from Crypto.Protocol.KDF import bcrypt, bcrypt_check +from Crypto.Cipher import AES + +from secret import FLAG + + +PORT = 1337 + +def blockify(msg: bytes, block_size: int): + block = [msg[i : i + block_size] for i in range(0, len(msg), block_size)] + return block + + +def xor(X: bytes, Y: bytes): + return bytes(x ^ y for (x, y) in zip(X, Y)) + + +class SE: + + def __init__(self, key_enc=None, key_mac=None): + self.key_enc = key_enc + self.key_mac = key_mac + if self.key_enc is None: + self.key_enc = secrets.token_bytes(16) + + if self.key_mac is None: + self.key_mac = secrets.token_bytes(16) + + def encrypt(self, ptxt: bytes, iv=None): + prev_c = iv + prev_p = bytes([0]) * AES.block_size + if prev_c is None: + prev_c = secrets.token_bytes(AES.block_size) + + ptxt_padded = pad(ptxt, AES.block_size, style="x923") + blocks = blockify(ptxt_padded, AES.block_size) + cipher = AES.new(self.key_enc, AES.MODE_ECB) + + ctxt = prev_c + for block in blocks: + mask = xor(prev_p, prev_c) + interm = xor(mask, block) + prev_c = cipher.encrypt(interm) + ctxt += prev_c + prev_p = block + + mac = HMAC.new(self.key_mac, digestmod=SHA256) + mac.update(ctxt) + + return ctxt + mac.digest() + + def decrypt(self, ctxt: bytes): + if (len(ctxt) <= AES.block_size + SHA256.digest_size) and ( + len(ctxt) % AES.block_size != 0 + ): + raise ValueError("Decryption failed.") + iv = ctxt[: AES.block_size] + blocks = blockify(ctxt[AES.block_size : -SHA256.digest_size], AES.block_size) + cipher = AES.new(self.key_enc, AES.MODE_ECB) + tag = ctxt[-SHA256.digest_size :] + + prev_c = iv + prev_p = bytes([0]) * AES.block_size + + ptxt_padded = b"" + for block in blocks: + interm = cipher.decrypt(block) + mask = xor(prev_p, prev_c) + prev_p = xor(mask, interm) + ptxt_padded += prev_p + prev_c = block + + ptxt = unpad(ptxt_padded, AES.block_size, style="x923") + + mac = HMAC.new(self.key_mac, digestmod=SHA256) + mac.update(ctxt[: -SHA256.digest_size]) + verified = True + try: + mac.verify(tag) + except ValueError: + verified = False + + return ptxt, verified + + +class Server: + def __init__(self, flag, stdin=sys.stdin.buffer, stdout=sys.stdout.buffer): + self.stdin = stdin + self.stdout = stdout + + self.flag = flag + + self.registered_users = {} + self.current_user = None + + self.version = b"v2.0.0 Andromeda" + self.MAX_TITLE_LEN = 2**8 - 1 + self.MAX_FLAG_LEN = 2**8 - 1 + + def send_message(self, msg: dict): + self.stdout.write(json.dumps(msg).encode() + b"\n") + self.stdout.flush() + + def read_message(self) -> dict: + return json.loads(self.stdin.readline()) + + def main(self): + try: + while True: + try: + self.handle_command() + except ( + TypeError, + KeyError, + ValueError, + json.decoder.JSONDecodeError, + ) as e: + self.send_message( + { + "res": f"Failed to execute command: {type(e).__name__}: {str(e)}" + } + ) + except BrokenPipeError: + pass + + def handle_command(self): + msg = self.read_message() + command = msg["command"] + + match command: + case "register": + self.register_handler(msg) + case "login": + self.login_handler(msg) + case "logout": + self.logout_handler() + case "edit_title": + self.edit_title_handler(msg) + case "delete_data": + self.delete_data_handler(msg) + case "insert_data": + self.insert_data_handler(msg) + case "encrypted_backup": + self.encrypted_backup_handler() + case "read_all": + self.read_all_handler() + case "restore_backup": + self.restore_backup_handler(msg) + case _: + raise ValueError("No such command") + + def register_handler(self, msg): + username = msg["username"] + password = msg["password"] + + if not isinstance(username, str) or not isinstance(password, str): + self.send_message( + {"res": "The provided username or password is not a string."} + ) + return + + if username in self.registered_users: + self.send_message({"res": "Registration failed."}) + return + + # Who cares about password policies. + self.registered_users[username] = {} + user = self.registered_users[username] + user["hash"] = bcrypt(b64encode(SHA256.new(password.encode()).digest()), 12) + user["key_enc"] = secrets.token_bytes(16) + user["key_mac"] = secrets.token_bytes(16) + user.update( + { + "version": self.version, + "flag": self.flag, + "title": b"Placeholder", + "data": b"", + } + ) + + self.send_message( + { + "res": f"Your registration was succesful. You will soon receive the AES and MAC keys by post. Please be patient, the mail will soon dispatch from Andromeda. In the meantime, you can edit the title and the data in your free storage." + } + ) + + def login_handler(self, msg): + if self.current_user is not None: + self.send_message({"res": "Logout first."}) + return + + username = msg["username"] + password = msg["password"] + + if username not in self.registered_users: + self.send_message({"res": "Login failed."}) + return + + b64pwd = b64encode(SHA256.new(password.encode()).digest()) + try: + bcrypt_check(b64pwd, self.registered_users[username]["hash"]) + except ValueError: + self.send_message({"res": "Login failed."}) + return + + self.current_user = username + self.send_message( + {"res": f"Welcome to our Andromeda Cloud Storage, {self.current_user}!"} + ) + + def logout_handler(self): + if self.current_user is None: + self.send_message({"res": "Already logged out."}) + else: + self.current_user = None + self.send_message({"res": "Succesfully logged out."}) + + def edit_title_handler(self, msg): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + new_title = bytes.fromhex(msg["new_title"]) + + if len(new_title) > self.MAX_TITLE_LEN: + self.send_message( + {"res": "Title is longer than the max allowed length (255)"} + ) + return + user = self.registered_users[self.current_user] + user["title"] = new_title + self.send_message({"res": "Title was succesfully updated."}) + + def delete_data_handler(self, msg): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + start_idx = msg["start_idx"] + end_idx = msg["end_idx"] + user = self.registered_users[self.current_user] + if not (0 <= start_idx <= end_idx): + self.send_message({"res": "Please provide a valid range."}) + return + + user["data"] = user["data"][:start_idx] + user["data"][end_idx + 1 :] + + self.send_message({"res": "Data range was succesfully deleted."}) + + def insert_data_handler(self, msg): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + insert_idx = msg["insert_idx"] + new_data = bytes.fromhex(msg["new_data"]) + + user = self.registered_users[self.current_user] + if insert_idx < 0: + self.send_message({"res": "Please provide a valid insertion point."}) + return + + user["data"] = user["data"][:insert_idx] + new_data + user["data"][insert_idx:] + + self.send_message({"res": "Data was succesfully inserted."}) + + def encrypted_backup_handler(self): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + user = self.registered_users[self.current_user] + + flag_len = len(user["flag"]) + title_len = len(user["title"]) + serialized_backup = ( + user["version"] + + flag_len.to_bytes(1, "big") + + user["flag"] + + title_len.to_bytes(1, "big") + + user["title"] + + user["data"] + ) + + cipher = SE(user["key_enc"], user["key_mac"]) + ctxt = cipher.encrypt(serialized_backup) + self.send_message( + { + "enc_backup": ctxt.hex(), + } + ) + + def read_all_handler(self): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + user = self.registered_users[self.current_user] + + self.send_message( + { + "title": user["title"].hex(), + "data": user["data"].hex(), + } + ) + + def restore_backup_handler(self, msg): + if self.current_user is None: + self.send_message({"res": "Please login first."}) + return + + enc_backup = bytes.fromhex(msg["enc_backup"]) + + user = self.registered_users[self.current_user] + cipher = SE(user["key_enc"], user["key_mac"]) + try: + serialized_backup, verified = cipher.decrypt(enc_backup) + if not verified: + self.send_message({"res": f"Decryption failed ({tz.strftime('%d-%m-%y %H:%M')})"}) + return + except: + self.send_message({"res": f"Decryption failed ({tZ.strftime('%d-%m-%y %H:%M')})"}) + return + + pos = 0 + version = serialized_backup[pos : pos + 16] + if version != self.version or len(version) != 16: + self.send_message({"res": "Invalid format."}) + return + pos += 16 + try: + flag_len = serialized_backup[pos] + except: + self.send_message({"res": "Invalid format."}) + return + pos += 1 + if len(serialized_backup[pos:]) < flag_len: + self.send_message({"res": "Invalid format."}) + return + flag = serialized_backup[pos : pos + flag_len] + pos += flag_len + try: + title_len = serialized_backup[pos] + except: + self.send_message({"res": "Invalid format."}) + return + pos += 1 + if len(serialized_backup[pos:]) < title_len: + self.send_message({"res": "Invalid format."}) + return + title = serialized_backup[pos : pos + title_len] + pos += title_len + data = serialized_backup[pos:] + + user["version"] = version + user["flag"] = flag + user["title"] = title + user["data"] = data + + self.send_message({"res": "Backup restored."}) + + +if __name__ == "__main__": + + class RequestHandler(socketserver.StreamRequestHandler): + def handle(self): + server = Server(flag=FLAG, stdin=self.rfile, stdout=self.wfile) + server.main() + + class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + + TCPServer(("0.0.0.0", PORT), RequestHandler).serve_forever() diff --git a/crypto/andromeda-cloud-storage-v2/solution/solve.py b/crypto/andromeda-cloud-storage-v2/solution/solve.py new file mode 100644 index 0000000..24fc203 --- /dev/null +++ b/crypto/andromeda-cloud-storage-v2/solution/solve.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +import json +import os + +from telnetlib import Telnet +from datetime import datetime +from pytz import timezone + +from Crypto.Cipher import AES +from Crypto.Hash import SHA256 + + +def blockify(msg: bytes, block_size: int): + block = [msg[i : i + block_size] for i in range(0, len(msg), block_size)] + return block + + +def xor(X: bytes, Y: bytes): + return bytes(x ^ y for (x, y) in zip(X, Y)) + + +def readline(tn: Telnet): + return tn.read_until(b"\n") + + +def json_recv(tn: Telnet): + line = readline(tn) + return json.loads(line.decode("utf-8")) + + +def json_send(tn: Telnet, req): + request = json.dumps(req).encode("utf-8") + tn.write(request + b"\n") + + +def register(tn: Telnet, username: str, password: str): + request = { + "command": "register", + "username": username, + "password": password, + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def login(tn: Telnet, username: str, password: str): + request = { + "command": "login", + "username": username, + "password": password, + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def logout(tn: Telnet): + request = { + "command": "logout", + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def edit_title(tn: Telnet, new_title: bytes): + request = { + "command": "edit_title", + "new_title": new_title.hex(), + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def delete_data(tn: Telnet, start_idx: int, end_idx: int): + request = { + "command": "delete_data", + "start_idx": start_idx, + "end_idx": end_idx, + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def insert_data(tn: Telnet, insert_idx: int, new_data: bytes): + request = { + "command": "insert_data", + "insert_idx": insert_idx, + "new_data": new_data.hex(), + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def get_encrypted_backup(tn: Telnet): + request = { + "command": "encrypted_backup", + } + json_send(tn, request) + res = json_recv(tn) + return bytes.fromhex(res["enc_backup"]) + + +def read_all(tn: Telnet): + request = { + "command": "read_all", + } + json_send(tn, request) + res = json_recv(tn) + return bytes.fromhex(res["title"]), bytes.fromhex(res["data"]) + + +def restore_backup(tn: Telnet, enc_backup: bytes): + request = { + "command": "restore_backup", + "enc_backup": enc_backup.hex(), + } + json_send(tn, request) + res = json_recv(tn) + return res["res"] + + +def oracle_valid_padding(msg: str): + hour = int(msg[-6:-4]) + tZ = datetime.now(timezone("CET")) + padding_hour = tZ.hour + if hour == padding_hour: + return False + else: + return True + + +def attack(tn: Telnet): + username = "ccsc" + password = "ccsc" + + _ = register(tn, username, password) + _ = login(tn, username, password) + + enc_backup = get_encrypted_backup(tn) + + iv = enc_backup[: AES.block_size] + c_blocks = blockify( + enc_backup[AES.block_size : -SHA256.digest_size], AES.block_size + ) + tag = enc_backup[-SHA256.digest_size :] + + guess_blocks = [] + prev_c = iv + prev_p = bytes([0]) * AES.block_size + for current_c in c_blocks: + p_guess = b"" + mask_rhs = b"" + # find the iv block that results into a valid padding of size AES.block_size + for i in range(AES.block_size): + mask_lhs = b"\x00" * (AES.block_size - i - 1) + # find the byte that results into a valid padding of size i+1 + for b in range(256): + last_mask_rhs = bytes([mask_rhs[-1] ^ i ^ (i + 1)]) if i > 0 else b"" + guess_mask = mask_lhs + bytes([b]) + mask_rhs[:-1] + last_mask_rhs + guess_iv = xor(xor(prev_c, prev_p), guess_mask) + res = restore_backup(tn, guess_iv + current_c + tag) + if oracle_valid_padding(res): + # check for edge case when the second, (and third, and forth...) + # last byte(s) is/are the zero byte. It requires an additional + # padding oracle query that modifies the second last byte just + # to break a valid padding that occurs by chance. + if i == 0: + guess_mask = mask_lhs[:-1] + b"\x01" + bytes([b]) + guess_iv = xor(xor(prev_c, prev_p), guess_mask) + res = restore_backup(tn, guess_iv + current_c + tag) + if not oracle_valid_padding(res): + continue + p_guess = bytes([b ^ (i + 1)]) + p_guess + mask_rhs = bytes([b]) + elif i != 0: + p_guess = bytes([b]) + p_guess + mask_rhs = ( + bytes([b]) + + mask_rhs[:-1] + + bytes([mask_rhs[-1] ^ i ^ (i + 1)]) + ) + break + prev_c = current_c + prev_p = p_guess + guess_blocks.append(p_guess) + + flag = b"".join(guess_blocks)[33:-18] + print(flag) + + +if __name__ == "__main__": + if "REMOTE" in os.environ: + HOSTNAME = "" + else: + HOSTNAME = "localhost" + PORT = 1337 + with Telnet(HOSTNAME, PORT) as tn: + attack(tn)