Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: only store SLRUs on shard zero #9786

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ impl ShardIdentity {
///
/// Shards _may_ drop keys which return false here, but are not obliged to.
pub fn is_key_disposable(&self, key: &Key) -> bool {
if key_is_shard0(key) {
if self.count < ShardCount(2) {
// Fast path: unsharded tenant doesn't dispose of anything
return false;
}

if key_is_shard0(key) && key.field1 != 0x01 {
// Q: Why can't we dispose of shard0 content if we're not shard 0?
// A1: because the WAL ingestion logic currently ingests some shard 0
// content on all shards, even though it's only read on shard 0. If we
Expand Down
56 changes: 36 additions & 20 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.await?;
Expand All @@ -360,6 +361,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await
}
Expand All @@ -372,6 +374,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
Ok(buf.get_u32_le())
Expand All @@ -385,6 +388,7 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
Expand Down Expand Up @@ -855,26 +859,28 @@ impl Timeline {
}

// Iterate SLRUs next
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
if self.tenant_shard_id.is_shard_zero() {
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();

result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
}
}

Expand Down Expand Up @@ -1269,6 +1275,10 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}

self.put(
slru_block_to_key(kind, segno, blknum),
Value::WalRecord(rec),
Expand Down Expand Up @@ -1302,6 +1312,9 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down Expand Up @@ -1343,6 +1356,9 @@ impl<'a> DatadirModification<'a> {
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
Expand Down
23 changes: 23 additions & 0 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::pausable_failpoint;
use utils::shard::ShardNumber;

use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
Expand Down Expand Up @@ -2231,6 +2232,28 @@ impl RemoteTimelineClient {
UploadQueue::Initialized(x) => x.no_pending_work(),
}
}

/// 'foreign' in the sense that it does not belong to this tenant shard. This method
/// is used during GC for other shards to get the index of shard zero.
pub(crate) async fn download_foreign_index(
&self,
shard_number: ShardNumber,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
let foreign_shard_id = TenantShardId {
shard_number,
shard_count: self.tenant_shard_id.shard_count,
tenant_id: self.tenant_shard_id.tenant_id,
};
download_index_part(
&self.storage_impl,
&foreign_shard_id,
&self.timeline_id,
Generation::MAX,
cancel,
)
.await
}
}

pub(crate) struct UploadQueueAccessor<'a> {
Expand Down
116 changes: 82 additions & 34 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use pageserver_api::{
shard::{ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
Expand Down Expand Up @@ -4774,6 +4775,86 @@ impl Timeline {
Ok(())
}

async fn find_gc_time_cutoff(
&self,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<Option<Lsn>, PageReconstructError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if self.shard_identity.is_shard_zero() {
// Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself
let now = SystemTime::now();
let time_range = if pitr == Duration::ZERO {
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
} else {
pitr
};

// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
let timestamp = to_pg_timestamp(time_cutoff);

let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
LsnForTimestamp::Present(lsn) => Some(lsn),
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,
// but what it really means is that there hasn't been
// any commits since the cutoff timestamp.
//
// In this case we should use the LSN of the most recent commit,
// which is implicitly the last LSN in the log.
debug!("future({})", lsn);
Some(self.get_last_record_lsn())
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
None
}
LsnForTimestamp::NoData(lsn) => {
debug!("nodata({})", lsn);
None
}
};
Ok(time_cutoff)
} else {
// Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff
// from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that
// the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a
// space cutoff that we would also have respected ourselves.
match self
.remote_client
.download_foreign_index(ShardNumber(0), cancel)
.await
{
Ok((index_part, index_generation, _index_mtime)) => {
tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}",
index_part.metadata.latest_gc_cutoff_lsn());
Ok(Some(index_part.metadata.latest_gc_cutoff_lsn()))
}
Err(DownloadError::NotFound) => {
// This is unexpected, because during timeline creations shard zero persists to remote
// storage before other shards are called, and during timeline deletion non-zeroth shards are
// deleted before the zeroth one. However, it should be harmless: if we somehow end up in this
// state, then shard zero should _eventually_ write an index when it GCs.
tracing::warn!("GC couldn't find shard zero's index for timeline");
Ok(None)
}
Err(e) => {
// TODO: this function should return a different error type than page reconstruct error
Err(PageReconstructError::Other(anyhow::anyhow!(e)))
}
}

// TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage
// controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero
// is going through a migration if we read the old location's index and it has GC'd ahead of the
// new location. This is legal in principle, but problematic in practice because it might result
// in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards
// because they have GC'd past the branch point.
}
}

/// Find the Lsns above which layer files need to be retained on
/// garbage collection.
///
Expand Down Expand Up @@ -4816,40 +4897,7 @@ impl Timeline {
// - if PITR interval is set, then this is our cutoff.
// - if PITR interval is not set, then we do a lookup
// based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases.
let time_cutoff = {
let now = SystemTime::now();
let time_range = if pitr == Duration::ZERO {
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
} else {
pitr
};

// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
let timestamp = to_pg_timestamp(time_cutoff);

match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
LsnForTimestamp::Present(lsn) => Some(lsn),
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,
// but what it really means is that there hasn't been
// any commits since the cutoff timestamp.
//
// In this case we should use the LSN of the most recent commit,
// which is implicitly the last LSN in the log.
debug!("future({})", lsn);
Some(self.get_last_record_lsn())
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
None
}
LsnForTimestamp::NoData(lsn) => {
debug!("nodata({})", lsn);
None
}
}
};
let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?;

Ok(match (pitr, time_cutoff) {
(Duration::ZERO, Some(time_cutoff)) => {
Expand Down
10 changes: 7 additions & 3 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl WalIngest {
assert!(!modification.has_dirty_data());
}

assert!(!self.checkpoint_modified);
assert!(!self.checkpoint_modified || !self.shard.is_shard_zero());
if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
&& self.checkpoint.update_next_xid(interpreted.xid)
{
Expand Down Expand Up @@ -277,8 +277,8 @@ impl WalIngest {
.ingest_batch(interpreted.batch, &self.shard, ctx)
.await?;

// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
// If checkpoint data was updated, store the new version in the repository (on shard zero only)
if self.checkpoint_modified && self.shard.is_shard_zero() {
let new_checkpoint_bytes = self.checkpoint.encode()?;

modification.put_checkpoint(new_checkpoint_bytes)?;
Expand Down Expand Up @@ -1372,6 +1372,10 @@ impl WalIngest {
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
if !self.shard.is_shard_zero() {
return Ok(());
}

self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?;
modification.put_slru_page_image(kind, segno, blknum, img)?;
Expand Down
16 changes: 11 additions & 5 deletions test_runner/fixtures/remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ def kill(self):
class LocalFsStorage:
root: Path

def tenant_path(self, tenant_id: TenantId) -> Path:
def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path:
return self.root / "tenants" / str(tenant_id)

def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
def timeline_path(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Path:
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)

def timeline_latest_generation(
self, tenant_id: TenantId, timeline_id: TimelineId
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Optional[int]:
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
index_parts = [f for f in timeline_files if f.startswith("index_part")]
Expand All @@ -102,7 +104,9 @@ def parse_gen(filename: str) -> Optional[int]:
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
return generations[-1]

def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
def index_path(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Path:
latest_gen = self.timeline_latest_generation(tenant_id, timeline_id)
if latest_gen is None:
filename = TIMELINE_INDEX_PART_FILE_NAME
Expand All @@ -126,7 +130,9 @@ def remote_layer_path(
filename = f"{local_name}-{generation:08x}"
return self.timeline_path(tenant_id, timeline_id) / filename

def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
def index_content(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Any:
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)

Expand Down
6 changes: 5 additions & 1 deletion test_runner/regress/test_pg_regress.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,17 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End

check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)

# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create.
# There should have been compactions mid-test as well, this final check is in addition those.
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
pageserver.http_client().timeline_checkpoint(
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
)

pageserver.http_client().timeline_gc(
shard, env.initial_timeline, None
)


# Run the main PostgreSQL regression tests, in src/test/regress.
#
Expand Down
Loading
Loading