Skip to content

Commit

Permalink
fix: handle truncation while assigning initial offsets
Browse files Browse the repository at this point in the history
Initial offsets never considered the current state of the file, instead
just preferring to use the state saved offsets or defaulting to the
value configured by the `MZ_LOOKBACK` option. This means truncation is
handled downstream where the agent doesn't have enough contextual
information to appropriately handle the case and potentially dropping
logs.

This change causes initial offsets to be ignored if they exceed the
current length of the file, and instead use the default values.

Ref: LOG-17217
Signed-off-by: Jacob Hull <[email protected]>
  • Loading branch information
jakedipity committed Dec 11, 2023
1 parent 5ff60e9 commit d71e6f1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 6 deletions.
73 changes: 73 additions & 0 deletions bin/tests/it/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,79 @@ fn test_append_and_move() {
agent_handle.kill().expect("Could not kill process");
}

#[tokio::test]
#[cfg_attr(not(feature = "integration_tests"), ignore)]
#[cfg(any(target_os = "windows", target_os = "linux"))]
async fn test_reuse_inode() {
let dir1 = tempdir().expect("Could not create temp dir").into_path();
let dir2 = tempdir().expect("Could not create temp dir").into_path();
let file_path = dir1.join("file1.log");
let mv_path = dir2.join("file1.log");
let total_lines = 500;

let db_dir = tempdir().unwrap().into_path();
let (server, received, shutdown_handle, addr) = common::start_http_ingester();

let mut settings = AgentSettings::with_mock_ingester(dir1.to_str().unwrap(), &addr);
settings.state_db_dir = Some(&db_dir);
settings.lookback = Some("start");
settings.exclusion_regex = Some(r"/var\w*");
settings.log_level = Some("info,fs::cache=trace");

let (server_result, _) = tokio::join!(server, async {
let mut agent_handle = common::spawn_agent(settings);
let mut stderr_reader = BufReader::new(agent_handle.stderr.take().unwrap());

let _ = OpenOptions::new()
.append(true)
.create(true)
.open(&file_path)
.unwrap();
common::wait_for_file_event("initialize", &file_path, &mut stderr_reader);
common::append_to_file(&file_path, 300, 300).expect("Could not append");
common::force_client_to_flush(&dir1).await;

// Move the file and truncate+append to simulate creating a new file with the same inode
fs::rename(&file_path, &mv_path).expect("Could not rename file");
common::truncate_file(&mv_path).expect("Could not truncate file");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
common::append_to_file(&mv_path, 200, 200).expect("Could not append");
fs::rename(&mv_path, &file_path).expect("Could not rename file");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

common::force_client_to_flush(&dir1).await;
consume_output(stderr_reader.into_inner());

// Wait for agent to process lines
tokio::time::sleep(tokio::time::Duration::from_millis(1_500)).await;

let mut file_info = FileInfo::default();
let map_key = file_path.to_str().unwrap();

for _ in 0..10 {
let map = received.lock().await;
file_info = map.get(map_key).unwrap_or(&file_info).clone();
// Avoid awaiting while holding the lock
drop(map);

if file_info.lines < total_lines {
// Wait for the data to be received by the mock ingester
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
break;
}

assert_eq!(file_info.values.len(), total_lines);

common::assert_agent_running(&mut agent_handle);

agent_handle.kill().expect("Could not kill process");
shutdown_handle();
});
server_result.unwrap()
}

#[test]
#[cfg_attr(not(feature = "integration_tests"), ignore)]
#[cfg(any(target_os = "windows", target_os = "linux"))]
Expand Down
5 changes: 2 additions & 3 deletions bin/tests/it/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ pub async fn force_client_to_flush(dir_path: &Path) {
#[cfg(not(target_os = "macos"))]
pub fn truncate_file(file_path: &Path) -> Result<(), std::io::Error> {
OpenOptions::new()
.read(true)
.write(true)
.open(file_path)?
.set_len(0)?;
.truncate(true)
.open(file_path)?;
Ok(())
}

Expand Down
22 changes: 19 additions & 3 deletions common/fs/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,11 @@ impl FileSystem {
None
}
}
let file_len = path.metadata().map(|m| m.len()).unwrap_or(0);
let default_offset = match self.lookback_config {
Lookback::Start => SpanVec::new(),
Lookback::SmallFiles => {
// Check the actual file len
let file_len = path.metadata().map(|m| m.len()).unwrap_or(0);
let smallfiles_offset = if file_len < 8192 {
SpanVec::new()
} else {
Expand All @@ -885,7 +885,6 @@ impl FileSystem {
.unwrap_or_default(),
Lookback::Tail => {
let mut should_lookback = false;
let file_len = path.metadata().map(|m| m.len()).unwrap_or(0);

if let Ok(metadata) = path.metadata() {
if let Ok(file_create_time) = metadata.created() {
Expand All @@ -908,7 +907,24 @@ impl FileSystem {
tail_offset
}
};
_lookup_offset(&self.initial_offsets, &inode, path).unwrap_or(default_offset)

let offsets = match _lookup_offset(&self.initial_offsets, &inode, path) {
Some(saved_offset) => {
if saved_offset[0].end > file_len {
warn!("inconsistent saved offsets");
default_offset
} else {
saved_offset
}
}
None => default_offset,
};

info!(
"initializing offset for {:?} to {:?} ({})",
path, offsets, self.lookback_config
);
offsets
}

pub fn resolve_valid_paths(&self, entry: &Entry, entries: &EntryMap) -> SmallVec<[PathBuf; 4]> {
Expand Down

0 comments on commit d71e6f1

Please sign in to comment.