Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Root and volume overlay filesystem #99

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ya-runtime-vm"
version = "0.2.8"
version = "0.2.9"
authors = ["Golem Factory <[email protected]>"]
edition = "2018"
license = "GPL-3.0"
209 changes: 157 additions & 52 deletions runtime/init-container/src/init.c
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <getopt.h>

#include "communication.h"
#include "cyclic_buffer.h"
@@ -40,9 +41,14 @@
.type = REDIRECT_FD_FILE, \
.path = NULL, \
}

#define MODE_RW_UGO (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)

#define USER_FS_TAG "userfs"
#define OUTPUT_PATH_PREFIX "/var/tmp/guest_agent_private/fds"
#define VOLUMES_PATH_PREFIX "/var/tmp/guest_agent_private/vols"

const char* ARG_FS_RAM = "ram";
const char* ARG_FS_RAM_TMP = "ram-tmp";

struct new_process_args {
char* bin;
@@ -75,6 +81,9 @@ static int g_sig_fd = -1;
static int g_epoll_fd = -1;

static struct process_desc* g_entrypoint_desc = NULL;
static struct cmd_args {
char* fs;
} args = { NULL };

static noreturn void die(void) {
sync();
@@ -97,6 +106,15 @@ static noreturn void die(void) {
_x; \
})

#define ALLOCC(x) ({ \
__typeof__(x) _x = (x); \
if (!_x) { \
fprintf(stderr, "OOM at %s:%d: %m\n", __FILE__, __LINE__); \
die(); \
} \
_x; \
})

static void load_module(const char* path) {
int fd = CHECK(open(path, O_RDONLY | O_CLOEXEC));
CHECK(syscall(SYS_finit_module, fd, "", 0));
@@ -260,39 +278,51 @@ static void setup_sigfd(void) {
g_sig_fd = CHECK(signalfd(g_sig_fd, &set, SFD_CLOEXEC));
}

static int create_dir_path(char* path) {
static int mkdirp(const char* full_path, mode_t mode) {
assert(path[0] == '/');

char* path = ALLOCC(strdup(full_path));
char* next = path;
int code = 0;

while (1) {
next = strchr(next + 1, '/');
if (!next) {
break;
}
*next = '\0';
int ret = mkdir(path, DEFAULT_DIR_PERMS);
int ret = mkdir(path, mode);
*next = '/';
if (ret < 0 && errno != EEXIST) {
return -1;
code = -1;
goto end;
}
}

if (mkdir(path, DEFAULT_DIR_PERMS) < 0 && errno != EEXIST) {
return -1;
if (mkdir(path, mode) < 0 && errno != EEXIST) {
code = -1;
goto end;
}
return 0;

end:
free(path);
return code;
}

static void setup_agent_directories(void) {
char* path = strdup(OUTPUT_PATH_PREFIX);
if (!path) {
fprintf(stderr, "setup_agent_directories OOM\n");
die();
}

CHECK(create_dir_path(path));
char* paths[] = {
OUTPUT_PATH_PREFIX,
VOLUMES_PATH_PREFIX
};

free(path);
size_t n = sizeof(paths) / sizeof(paths[0]);
for (size_t i = 0; i < n; ++i) {
if (!paths[i]) {
fprintf(stderr, "setup_agent_directories OOM\n");
die();
}
CHECK(mkdirp(paths[i], DEFAULT_DIR_PERMS));
}
}

static void send_response_hdr(msg_id_t msg_id, enum GUEST_MSG_TYPE type) {
@@ -878,13 +908,53 @@ static void handle_kill_process(msg_id_t msg_id) {
}

static uint32_t do_mount(const char* tag, char* path) {
if (create_dir_path(path) < 0) {
return errno;
char* args_9p = "defaults,trans=virtio,version=9p2000.L,nodevmap,redirect_dir=on";
char* args = NULL;

size_t dirs_sz = 5;
char** dirs = NULL;

if ((dirs = malloc(sizeof(char*) * dirs_sz)) == NULL) {
return -ENOMEM;
}
if (mount(tag, path, "9p", 0, "trans=virtio,version=9p2000.L") < 0) {
return errno;
memset(dirs, 0, dirs_sz);

ALLOCC(asprintf(&dirs[0], "%s/%s", VOLUMES_PATH_PREFIX, tag));
ALLOCC(asprintf(&dirs[1], "%s/mnt", dirs[0]));
ALLOCC(asprintf(&dirs[2], "%s/lower", dirs[0]));
ALLOCC(asprintf(&dirs[3], "%s/upper", dirs[1]));
ALLOCC(asprintf(&dirs[4], "%s/work", dirs[1]));
ALLOCC(asprintf(&args,
"lowerdir=%s,upperdir=%s,workdir=%s",
dirs[2], dirs[3], dirs[4]));

if (mkdirp(path, S_IRWXU) != 0) goto end;
if (mkdirp(dirs[1], S_IRWXU) != 0) goto end;
if (mkdirp(dirs[2], S_IRWXU) != 0) goto end;

if (mount(tag, dirs[1], "9p", 0, args_9p) != 0) {
goto end;
}
return 0;

if (mount(path, dirs[2], "none", MS_BIND | MS_REC, NULL) != 0) {
goto end;
}

if (mkdirp(dirs[3], S_IRWXU) != 0) goto end;
if (mkdirp(dirs[4], S_IRWXU) != 0) goto end;

if (mount("overlay", path, "overlay", 0, args) != 0) {
goto end;
}

end:
if (dirs) {
for (size_t i = 0; i < dirs_sz; ++i) {
if (dirs[i]) free(dirs[i]);
}
free(dirs);
}
return errno == EEXIST ? 0 : errno;
}

static void handle_mount(msg_id_t msg_id) {
@@ -923,8 +993,8 @@ static void handle_mount(msg_id_t msg_id) {
ret = do_mount(tag, path);

out:
free(path);
free(tag);
if (path) free(path);
if (tag) free(tag);
if (ret) {
send_response_err(msg_id, ret);
} else {
@@ -1254,19 +1324,39 @@ static noreturn void main_loop(void) {
}
}

static void create_dir(const char *pathname, mode_t mode) {
if (mkdir(pathname, mode) < 0 && errno != EEXIST) {
fprintf(stderr, "mkdir(%s) failed with: %m\n", pathname);
die();
struct cmd_args parse_args(int argc, char *argv[]) {
int parsing = 1;
while (parsing) {
static struct option options[] = {
{"fs", required_argument, 0, 'f'},
{0, 0, 0, 0}
};

int i = 0;
int c = getopt_long(argc, argv, "f:", options, &i);

switch (c) {
case -1:
case 0:
parsing = 0;
break;
case 'f':
args.fs = optarg;
continue;
default:
fprintf(stderr, "arg: %c\n", c);
}
}

return args;
}

int main(void) {
int main(int argc, char *argv[]) {
setbuf(stdin, NULL);
setbuf(stdout, NULL);
setbuf(stderr, NULL);

create_dir("/dev", DEFAULT_DIR_PERMS);
CHECK(mkdirp("/dev", DEFAULT_DIR_PERMS));
CHECK(mount("devtmpfs", "/dev", "devtmpfs", MS_NOSUID,
"mode=0755,size=2M"));

@@ -1286,32 +1376,43 @@ int main(void) {

g_cmds_fd = CHECK(open("/dev/vport0p1", O_RDWR | O_CLOEXEC));

CHECK(mkdir("/mnt", S_IRWXU));
CHECK(mkdir("/mnt/image", S_IRWXU));
CHECK(mkdir("/mnt/overlay", S_IRWXU));
CHECK(mkdir("/mnt/newroot", DEFAULT_DIR_PERMS));

// 'workdir' and 'upperdir' have to be on the same filesystem
CHECK(mount("tmpfs", "/mnt/overlay", "tmpfs",
MS_NOSUID,
"mode=0777,size=128M"));

CHECK(mkdir("/mnt/overlay/upper", S_IRWXU));
CHECK(mkdir("/mnt/overlay/work", S_IRWXU));
struct cmd_args args = parse_args(argc, argv);

CHECK(mount("/dev/vda", "/mnt/image", "squashfs", MS_RDONLY, ""));
CHECK(mount("overlay", "/mnt/newroot", "overlay", 0,
"lowerdir=/mnt/image,upperdir=/mnt/overlay/upper,workdir=/mnt/overlay/work"));
CHECK(mkdirp("/mnt/imagefs", S_IRWXU));
CHECK(mkdirp("/mnt/userfs", S_IRWXU));
CHECK(mkdirp("/mnt/overlay", DEFAULT_DIR_PERMS));

CHECK(umount2("/dev", MNT_DETACH));
CHECK(mount("/dev/vda", "/mnt/imagefs", "squashfs",
MS_RDONLY,
NULL));

if (args.fs != NULL && strcmp(args.fs, ARG_FS_RAM) == 0) {
fprintf(stderr, "mounting root fs as tmpfs\n");
CHECK(mount("tmpfs", "/mnt/userfs", "tmpfs",
0,
"mode=0777,size=128M"));
} else {
fprintf(stderr, "mounting root fs as 9p\n");
CHECK(mount(USER_FS_TAG, "/mnt/userfs", "9p",
0,
"defaults,trans=virtio,version=9p2000.L,nodevmap,redirect_dir=on"));
}

CHECK(mkdirp("/mnt/userfs/upper", S_IRWXU));
CHECK(mkdirp("/mnt/userfs/work", S_IRWXU));

CHECK(chdir("/mnt/newroot"));
CHECK(mount("overlay", "/mnt/overlay", "overlay",
0,
"lowerdir=/mnt/imagefs,upperdir=/mnt/userfs/upper,workdir=/mnt/userfs/work"));

CHECK(umount2("/dev", MNT_DETACH));
CHECK(chdir("/mnt/overlay"));
CHECK(mount(".", "/", "none", MS_MOVE, NULL));
CHECK(chroot("."));
CHECK(chdir("/"));

create_dir("/dev", DEFAULT_DIR_PERMS);
create_dir("/tmp", DEFAULT_DIR_PERMS);
CHECK(mkdirp("/dev", DEFAULT_DIR_PERMS));
CHECK(mkdirp("/tmp", DEFAULT_DIR_PERMS));

CHECK(mount("proc", "/proc", "proc",
MS_NODEV | MS_NOSUID | MS_NOEXEC,
@@ -1322,12 +1423,16 @@ int main(void) {
CHECK(mount("devtmpfs", "/dev", "devtmpfs",
MS_NOSUID,
"exec,mode=0755,size=2M"));
CHECK(mount("tmpfs", "/tmp", "tmpfs",
MS_NOSUID,
"mode=0777"));

create_dir("/dev/pts", DEFAULT_DIR_PERMS);
create_dir("/dev/shm", DEFAULT_DIR_PERMS);
if (args.fs != NULL && strcmp(args.fs, ARG_FS_RAM_TMP) == 0) {
fprintf(stderr, "mounting /tmp as tmpfs\n");
CHECK(mount("tmpfs", "/tmp", "tmpfs",
MS_NOSUID,
"mode=0777"));
}

CHECK(mkdirp("/dev/pts", DEFAULT_DIR_PERMS));
CHECK(mkdirp("/dev/shm", DEFAULT_DIR_PERMS));

CHECK(mount("devpts", "/dev/pts", "devpts",
MS_NOSUID | MS_NOEXEC,
119 changes: 101 additions & 18 deletions runtime/src/deploy.rs
Original file line number Diff line number Diff line change
@@ -3,12 +3,14 @@ use crc::crc32;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::SeekFrom;
use std::path::PathBuf;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt;
use tokio_byteorder::LittleEndian;
use uuid::Uuid;
use ya_runtime_sdk::runtime_api::deploy::ContainerVolume;

const VOLUME_OVERLAY_UPPER_DIR: &'static str = "upper";

#[derive(Debug, Default, Deserialize, Serialize)]
pub struct Deployment {
#[serde(default)]
@@ -18,20 +20,87 @@ pub struct Deployment {
#[serde(default)]
pub task_package: PathBuf,
pub user: (u32, u32),
pub volumes: Vec<ContainerVolume>,
pub config: ContainerConfig,
pub volumes: Vec<Volume>,
pub config: Config,
}

#[derive(Debug, Default, Deserialize, Serialize)]
pub struct Config {
#[serde(flatten)]
pub container: ContainerConfig,
#[serde(rename = "Filesystem")]
#[serde(default)]
pub fs: Fs,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Volume {
pub base_name: String,
pub name: String,
pub path: String,
}

impl Volume {
pub fn tag(&self, idx: usize) -> String {
format!("mnt{}", idx)
}

pub fn base_dir<P: AsRef<Path>>(&self, path: P) -> String {
path.as_ref()
.join(&self.base_name)
.to_string_lossy()
.to_string()
}

pub fn dir<P: AsRef<Path>>(&self, path: P) -> String {
path.as_ref().join(&self.name).to_string_lossy().to_string()
}
}

impl Into<ContainerVolume> for Volume {
fn into(self) -> ContainerVolume {
ContainerVolume {
name: self.name,
path: self.path,
}
}
}

/// Root filesystem overlay mode
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum Fs {
/// Mount the overlay on disk (default)
Disk,
/// Keep the overlay in RAM (limit: 128 MB)
Ram,
/// Mount the overlay on disk but keep /tmp in RAM (limit: 128 MB)
RamTmp,
}

impl Fs {
pub fn in_memory(&self) -> bool {
match self {
Self::Ram => true,
_ => false,
}
}
}

impl Default for Fs {
fn default() -> Self {
Self::Disk
}
}

impl Deployment {
pub async fn try_from_input<Input>(
mut input: Input,
pub async fn try_from_input(
task_package: PathBuf,
cpu_cores: usize,
mem_mib: usize,
task_package: PathBuf,
) -> Result<Self, anyhow::Error>
where
Input: AsyncRead + AsyncSeek + Unpin,
{
) -> Result<Self, anyhow::Error> {
let mut input = tokio::fs::File::open(&task_package).await?;

let json_len: u32 = {
let mut buf = [0; 8];
input.seek(SeekFrom::End(-8)).await?;
@@ -55,24 +124,33 @@ impl Deployment {
return Err(anyhow::anyhow!("Invalid ContainerConfig crc32 sum"));
}

let config: ContainerConfig = serde_json::from_str(&json)?;
let config: Config = serde_json::from_str(&json)?;
Ok(Deployment {
cpu_cores,
mem_mib,
task_package,
user: parse_user(config.user.as_ref())?,
volumes: parse_volumes(config.volumes.as_ref()),
user: parse_user(config.container.user.as_ref())?,
volumes: parse_volumes(config.container.volumes.as_ref()),
config,
})
}

pub fn env(&self) -> Vec<&str> {
self.config
.container
.env
.as_ref()
.map(|v| v.iter().map(|s| s.as_str()).collect())
.unwrap_or_else(Vec::new)
}

pub fn volumes(&self) -> Vec<ContainerVolume> {
self.volumes.iter().cloned().map(Into::into).collect()
}

pub fn init_args(&self) -> String {
format!("-f {}", serde_json::to_string(&self.config.fs).unwrap())
}
}

fn parse_user(user: Option<&String>) -> anyhow::Result<(u32, u32)> {
@@ -93,16 +171,21 @@ fn parse_user(user: Option<&String>) -> anyhow::Result<(u32, u32)> {
Ok((uid, gid))
}

fn parse_volumes(volumes: Option<&HashMap<String, HashMap<(), ()>>>) -> Vec<ContainerVolume> {
fn parse_volumes(volumes: Option<&HashMap<String, HashMap<(), ()>>>) -> Vec<Volume> {
let volumes = match volumes {
Some(v) => v,
_ => return Vec::new(),
};
volumes
.keys()
.map(|key| ContainerVolume {
name: format!("vol-{}", Uuid::new_v4()),
path: key.to_string(),
.map(|key| {
let base_name = format!("vol-{}", Uuid::new_v4());
let name = format!("{}/{}", base_name, VOLUME_OVERLAY_UPPER_DIR);
Volume {
base_name,
name,
path: key.to_string(),
}
})
.collect()
}
78 changes: 44 additions & 34 deletions runtime/src/main.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ use ya_runtime_sdk::{
};
use ya_runtime_vm::{
cpu::CpuInfo,
deploy::Deployment,
deploy::{Deployment, Fs},
guest_agent_comm::{GuestAgent, Notification, RedirectFdType, RemoteCommandResult},
};

@@ -34,6 +34,7 @@ const FILE_INITRAMFS: &'static str = "initramfs.cpio.gz";
const FILE_TEST_IMAGE: &'static str = "self-test.gvmi";
const FILE_DEPLOYMENT: &'static str = "deployment.json";
const DEFAULT_CWD: &'static str = "/";
const USER_FS_TAG: &'static str = "userfs";

#[derive(StructOpt, Clone, Default)]
#[structopt(rename_all = "kebab-case")]
@@ -166,20 +167,17 @@ impl ya_runtime_sdk::Runtime for Runtime {

async fn deploy(workdir: PathBuf, cli: Cli) -> anyhow::Result<serialize::json::Value> {
let workdir = normalize_path(&workdir).await?;
let package_path = normalize_path(&cli.task_package.unwrap()).await?;
let package_file = fs::File::open(&package_path).await?;

let deployment = Deployment::try_from_input(
package_file,
cli.cpu_cores,
(cli.mem_gib * 1024.) as usize,
package_path,
)
.await
.expect("Error reading package metadata");
let package = normalize_path(&cli.task_package.unwrap()).await?;
let deployment =
Deployment::try_from_input(package, cli.cpu_cores, (cli.mem_gib * 1024.) as usize)
.await
.expect("Error reading package metadata");

if !deployment.config.fs.in_memory() {
fs::create_dir_all(workdir.join(USER_FS_TAG)).await?;
}
for vol in &deployment.volumes {
fs::create_dir_all(workdir.join(&vol.name)).await?;
fs::create_dir_all(vol.dir(&workdir)).await?;
}

fs::OpenOptions::new()
@@ -193,7 +191,7 @@ async fn deploy(workdir: PathBuf, cli: Cli) -> anyhow::Result<serialize::json::V

Ok(serialize::json::to_value(DeployResult {
valid: Ok(Default::default()),
vols: deployment.volumes,
vols: deployment.volumes(),
start_mode: StartMode::Blocking,
})?)
}
@@ -230,7 +228,7 @@ async fn start(
"-smp",
deployment.cpu_cores.to_string().as_str(),
"-append",
"console=ttyS0 panic=1",
format!("console=ttyS0 panic=1 - {}", deployment.init_args()).as_str(),
"-device",
"virtio-serial",
"-device",
@@ -252,12 +250,21 @@ async fn start(
"-no-reboot",
]);

for (idx, volume) in deployment.volumes.iter().enumerate() {
if !deployment.config.fs.in_memory() {
cmd.arg("-virtfs");
cmd.arg(format!(
"local,id={tag},path={path},security_model=none,mount_tag={tag}",
tag = format!("mnt{}", idx),
path = work_dir.join(&volume.name).to_string_lossy(),
"local,id={tag},path={path},security_model=mapped,mount_tag={tag}",
tag = USER_FS_TAG,
path = work_dir.join(USER_FS_TAG).to_string_lossy(),
));
}

for (idx, vol) in deployment.volumes.iter().enumerate() {
cmd.arg("-virtfs");
cmd.arg(format!(
"local,id={tag},path={path},security_model=mapped,mount_tag={tag}",
tag = vol.tag(idx),
path = vol.base_dir(&work_dir), // note the `base_dir` here
));
}

@@ -267,8 +274,7 @@ async fn start(
.kill_on_drop(true)
.spawn()?;

let stdout = runtime.stdout.take().unwrap();
spawn(reader_to_log(stdout));
spawn(reader_to_log(runtime.stdout.take().unwrap()));

let ga = GuestAgent::connected(socket_path, 10, move |notification, ga| {
let mut emitter = emitter.clone();
@@ -282,8 +288,8 @@ async fn start(

{
let mut ga = ga.lock().await;
for (idx, volume) in deployment.volumes.iter().enumerate() {
ga.mount(format!("mnt{}", idx).as_str(), volume.path.as_str())
for (idx, vol) in deployment.volumes.iter().enumerate() {
ga.mount(vol.tag(idx).as_str(), vol.path.as_str())
.await?
.expect("Mount failed");
}
@@ -306,14 +312,15 @@ async fn run_command(
let env = deployment.env();
let cwd = deployment
.config
.container
.working_dir
.as_ref()
.filter(|s| !s.trim().is_empty())
.map(|s| s.as_str())
.unwrap_or_else(|| DEFAULT_CWD);

log::debug!("got run process: {:?}", run);
log::debug!("work dir: {:?}", deployment.config.working_dir);
log::debug!("work dir: {:?}", deployment.config.container.working_dir);

let result = data
.ga()
@@ -397,18 +404,21 @@ async fn test() -> anyhow::Result<()> {
.expect("Test image not found");

println!("Task package: {}", task_package.display());
let runtime_data = RuntimeData {
runtime: None,
ga: None,
deployment: Some(Deployment {
cpu_cores: 1,
mem_mib: 128,
task_package,
..Deployment::default()
}),

let mut deployment = Deployment {
cpu_cores: 1,
mem_mib: 128,
task_package,
..Deployment::default()
};
deployment.config.fs = Fs::Ram;

let runtime = Runtime {
data: Arc::new(Mutex::new(runtime_data)),
data: Arc::new(Mutex::new(RuntimeData {
runtime: None,
ga: None,
deployment: Some(deployment),
})),
};

println!("Starting runtime");