Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove separate entity counting step at end of copy #5819

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 46 additions & 79 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use graph::{
use itertools::Itertools;

use crate::{
advisory_lock, catalog,
advisory_lock, catalog, deployment,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
relational::index::IndexList,
Expand Down Expand Up @@ -208,17 +208,17 @@ impl CopyState {
})
})
.collect::<Result<_, _>>()?;
tables.sort_by_key(|table| table.batch.dst.object.to_string());
tables.sort_by_key(|table| table.dst.object.to_string());

let values = tables
.iter()
.map(|table| {
(
cts::entity_type.eq(table.batch.dst.object.as_str()),
cts::entity_type.eq(table.dst.object.as_str()),
cts::dst.eq(dst.site.id),
cts::next_vid.eq(table.batch.next_vid()),
cts::target_vid.eq(table.batch.target_vid()),
cts::batch_size.eq(table.batch.batch_size()),
cts::next_vid.eq(table.batcher.next_vid()),
cts::target_vid.eq(table.batcher.target_vid()),
cts::batch_size.eq(table.batcher.batch_size() as i64),
)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -294,51 +294,11 @@ pub(crate) fn source(
/// so that we can copy rows from one to the other with very little
/// transformation. See `CopyEntityBatchQuery` for the details of what
/// exactly that means
pub(crate) struct BatchCopy {
struct TableState {
src: Arc<Table>,
dst: Arc<Table>,
batcher: VidBatcher,
}

impl BatchCopy {
pub fn new(batcher: VidBatcher, src: Arc<Table>, dst: Arc<Table>) -> Self {
Self { src, dst, batcher }
}

/// Copy one batch of entities and update internal state so that the
/// next call to `run` will copy the next batch
pub fn run(&mut self, conn: &mut PgConnection) -> Result<Duration, StoreError> {
let (duration, _) = self.batcher.step(|start, end| {
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
.execute(conn)?;
Ok(())
})?;

Ok(duration)
}

pub fn finished(&self) -> bool {
self.batcher.finished()
}

/// The first `vid` that has not been copied yet
pub fn next_vid(&self) -> i64 {
self.batcher.next_vid()
}

/// The last `vid` that should be copied
pub fn target_vid(&self) -> i64 {
self.batcher.target_vid()
}

pub fn batch_size(&self) -> i64 {
self.batcher.batch_size() as i64
}
}

struct TableState {
batch: BatchCopy,
dst_site: Arc<Site>,
batcher: VidBatcher,
duration_ms: i64,
}

Expand All @@ -354,14 +314,16 @@ impl TableState {
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
Ok(Self {
batch: BatchCopy::new(batcher, src, dst),
src,
dst,
dst_site,
batcher,
duration_ms: 0,
})
}

fn finished(&self) -> bool {
self.batch.finished()
self.batcher.finished()
}

fn load(
Expand Down Expand Up @@ -427,11 +389,12 @@ impl TableState {
VidRange::new(current_vid, target_vid),
)?
.with_batch_size(size as usize);
let batch = BatchCopy::new(batcher, src, dst);

Ok(TableState {
batch,
src,
dst,
dst_site: dst_layout.site.clone(),
batcher,
duration_ms,
})
}
Expand Down Expand Up @@ -460,20 +423,20 @@ impl TableState {
update(
cts::table
.filter(cts::dst.eq(self.dst_site.id))
.filter(cts::entity_type.eq(self.batch.dst.object.as_str()))
.filter(cts::entity_type.eq(self.dst.object.as_str()))
.filter(cts::duration_ms.eq(0)),
)
.set(cts::started_at.eq(sql("now()")))
.execute(conn)?;
let values = (
cts::next_vid.eq(self.batch.next_vid()),
cts::batch_size.eq(self.batch.batch_size()),
cts::next_vid.eq(self.batcher.next_vid()),
cts::batch_size.eq(self.batcher.batch_size() as i64),
cts::duration_ms.eq(self.duration_ms),
);
update(
cts::table
.filter(cts::dst.eq(self.dst_site.id))
.filter(cts::entity_type.eq(self.batch.dst.object.as_str())),
.filter(cts::entity_type.eq(self.dst.object.as_str())),
)
.set(values)
.execute(conn)?;
Expand All @@ -486,7 +449,7 @@ impl TableState {
update(
cts::table
.filter(cts::dst.eq(self.dst_site.id))
.filter(cts::entity_type.eq(self.batch.dst.object.as_str())),
.filter(cts::entity_type.eq(self.dst.object.as_str())),
)
.set(cts::finished_at.eq(sql("now()")))
.execute(conn)?;
Expand All @@ -512,7 +475,17 @@ impl TableState {
}

fn copy_batch(&mut self, conn: &mut PgConnection) -> Result<Status, StoreError> {
let duration = self.batch.run(conn)?;
let (duration, count) = self.batcher.step(|start, end| {
let count = rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
.count_current()
.get_result::<i64>(conn)
.optional()?;
Ok(count.unwrap_or(0) as i32)
})?;

let count = count.unwrap_or(0);

deployment::update_entity_count(conn, &self.dst_site, count)?;
Copy link
Contributor

@mangas mangas Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand this correctly, I thought that count_current would run the count for the batch with the goal of adding it to previous result. Otherwise it would be counting all at the end as it was previously, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to count_current wraps the CopyEntityBatchQuery so that the overall query returns how many current versions were copied, i.e., we end up running something like

with copy_cte(current) as (
  insert into dst(...) select * from src where ... returning block_range @> int32::MAX)
select count(*) from copy_cte where current

For the select, copy_cte is a table that has one boolean column, which is true if the row we just inserted is a current version.

The query does two things at once: (1) it copies a batch (the inner insert statement) and (2) returns how many rows that were copied are considered 'current', i.e. entity versions that should be counted for the entity count of the deployment. Line 488 then takes that count and adds it to the entity count for the deployment.

With this change, we never run a count query just by itself.


self.record_progress(conn, duration)?;

Expand All @@ -539,12 +512,12 @@ impl<'a> CopyProgress<'a> {
let target_vid: i64 = state
.tables
.iter()
.map(|table| table.batch.target_vid())
.map(|table| table.batcher.target_vid())
.sum();
let current_vid = state
.tables
.iter()
.map(|table| table.batch.next_vid())
.map(|table| table.batcher.next_vid())
.sum();
Self {
logger,
Expand Down Expand Up @@ -577,23 +550,23 @@ impl<'a> CopyProgress<'a> {
}
}

fn update(&mut self, batch: &BatchCopy) {
fn update(&mut self, entity_type: &EntityType, batcher: &VidBatcher) {
if self.last_log.elapsed() > LOG_INTERVAL {
info!(
self.logger,
"Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data",
Self::progress_pct(batch.next_vid(), batch.target_vid()),
batch.dst.object,
batch.next_vid(),
batch.target_vid(),
Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid)
Self::progress_pct(batcher.next_vid(), batcher.target_vid()),
entity_type,
batcher.next_vid(),
batcher.target_vid(),
Self::progress_pct(self.current_vid + batcher.next_vid(), self.target_vid)
);
self.last_log = Instant::now();
}
}

fn table_finished(&mut self, batch: &BatchCopy) {
self.current_vid += batch.next_vid();
fn table_finished(&mut self, batcher: &VidBatcher) {
self.current_vid += batcher.next_vid();
}

fn finished(&self) {
Expand Down Expand Up @@ -728,9 +701,9 @@ impl Connection {
if status == Status::Cancelled {
return Ok(status);
}
progress.update(&table.batch);
progress.update(&table.dst.object, &table.batcher);
}
progress.table_finished(&table.batch);
progress.table_finished(&table.batcher);
}

// Create indexes for all the attributes that were postponed at the start of
Expand All @@ -740,8 +713,8 @@ impl Connection {
for table in state.tables.iter() {
let arr = index_list.indexes_for_table(
&self.dst.site.namespace,
&table.batch.src.name.to_string(),
&table.batch.dst,
&table.src.name.to_string(),
&table.dst,
true,
true,
)?;
Expand All @@ -756,18 +729,12 @@ impl Connection {
// Here we need to skip those created in the first step for the old fields.
for table in state.tables.iter() {
let orig_colums = table
.batch
.src
.columns
.iter()
.map(|c| c.name.to_string())
.collect_vec();
for sql in table
.batch
.dst
.create_postponed_indexes(orig_colums)
.into_iter()
{
for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() {
let query = sql_query(sql);
query.execute(conn)?;
}
Expand Down
11 changes: 3 additions & 8 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,17 +1249,12 @@ pub fn update_entity_count(
Ok(())
}

/// Set the deployment's entity count to whatever `full_count_query` produces
pub fn set_entity_count(
conn: &mut PgConnection,
site: &Site,
full_count_query: &str,
) -> Result<(), StoreError> {
/// Set the deployment's entity count back to `0`
pub fn clear_entity_count(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> {
use subgraph_deployment as d;

let full_count_query = format!("({})", full_count_query);
update(d::table.filter(d::id.eq(site.id)))
.set(d::entity_count.eq(sql(&full_count_query)))
.set(d::entity_count.eq(BigDecimal::from(0)))
.execute(conn)?;
Ok(())
}
Expand Down
11 changes: 4 additions & 7 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ impl DeploymentStore {

let event = if truncate {
let event = layout.truncate_tables(conn)?;
deployment::set_entity_count(conn, site.as_ref(), layout.count_query.as_str())?;
deployment::clear_entity_count(conn, site.as_ref())?;
event
} else {
let (event, count) = layout.revert_block(conn, block)?;
Expand Down Expand Up @@ -1592,13 +1592,10 @@ impl DeploymentStore {
.number
.checked_add(1)
.expect("block numbers fit into an i32");
dst.revert_block(conn, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());
let (_, count) = dst.revert_block(conn, block_to_revert)?;
deployment::update_entity_count(conn, &dst.site, count)?;

let start = Instant::now();
deployment::set_entity_count(conn, &dst.site, &dst.count_query)?;
info!(logger, "Counted the entities";
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());

deployment::set_history_blocks(
Expand Down
27 changes: 4 additions & 23 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR};
use graph::data::subgraph::schema::POI_TABLE;
use graph::prelude::{
anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityOperation, Logger,
QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX,
QueryExecutionError, StoreError, StoreEvent, ValueType,
};

use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
Expand Down Expand Up @@ -231,8 +231,6 @@ pub struct Layout {
pub tables: HashMap<EntityType, Arc<Table>>,
/// The database schema for this subgraph
pub catalog: Catalog,
/// The query to count all entities
pub count_query: String,
/// How many blocks of history the subgraph should keep
pub history_blocks: BlockNumber,

Expand Down Expand Up @@ -290,25 +288,6 @@ impl Layout {
))
}

let count_query = tables
.iter()
.map(|table| {
if table.immutable {
format!(
"select count(*) from \"{}\".\"{}\"",
&catalog.site.namespace, table.name
)
} else {
format!(
"select count(*) from \"{}\".\"{}\" where block_range @> {}",
&catalog.site.namespace, table.name, BLOCK_NUMBER_MAX
)
}
})
.collect::<Vec<_>>()
.join("\nunion all\n");
let count_query = format!("select sum(e.count) from ({}) e", count_query);

let tables: HashMap<_, _> = tables
.into_iter()
.fold(HashMap::new(), |mut tables, table| {
Expand All @@ -322,7 +301,6 @@ impl Layout {
site,
catalog,
tables,
count_query,
history_blocks: i32::MAX,
input_schema: schema.cheap_clone(),
rollups,
Expand Down Expand Up @@ -1037,6 +1015,9 @@ impl Layout {
/// numbers. After this operation, only entity versions inserted or
/// updated at blocks with numbers strictly lower than `block` will
/// remain
///
/// The `i32` that is returned is the amount by which the entity count
/// for the subgraph needs to be adjusted
pub fn revert_block(
&self,
conn: &mut PgConnection,
Expand Down
Loading
Loading