Skip to content

Commit

Permalink
[indexer-alt] Add obj_info pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Dec 2, 2024
1 parent 5da94f1 commit cf918de
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS obj_info;
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
-- A table that keeps track of all the updates to object type and owner information.
-- In particular, whenever an object's presence or ownership changes, we insert a
-- new row into this table. Each row should have a unique (object_id, cp_sequence_number)
-- pair.
-- When implementing consistency queries, we will use this table to find all
-- object IDs that match the given filters bounded by the cursor checkpoint.
-- These object IDs can then be used to look up the latest version of the objects
-- bounded by the given checkpoint in the object_versions table.
CREATE TABLE IF NOT EXISTS obj_info
(
object_id BYTEA NOT NULL,
cp_sequence_number BIGINT NOT NULL,
-- An enum describing the object's ownership model:
--
-- Immutable = 0,
-- Address-owned = 1,
-- Object-owned (dynamic field) = 2,
-- Shared = 3.
--
-- Note that there is a distinction between an object that is owned by
-- another object (kind 2), which relates to dynamic fields, and an object
-- that is owned by another object's address (kind 1), which relates to
-- transfer-to-object.
owner_kind SMALLINT,
-- The address for address-owned objects, and the parent object for
-- object-owned objects.
owner_id BYTEA,
-- The following fields relate to the object's type. These only apply to
-- Move Objects. For Move Packages they will all be NULL.
--
-- The type's package ID.
package BYTEA,
-- The type's module name.
module TEXT,
-- The type's name.
name TEXT,
-- The type's type parameters, as a BCS-encoded array of TypeTags.
instantiation BYTEA,
PRIMARY KEY (object_id, cp_sequence_number)
);

CREATE INDEX IF NOT EXISTS obj_info_owner
ON obj_info (owner_kind, owner_id, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_pkg
ON obj_info (package, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_mod
ON obj_info (package, module, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_name
ON obj_info (package, module, name, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_inst
ON obj_info (package, module, name, instantiation, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_owner_pkg
ON obj_info (owner_kind, owner_id, package, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_owner_mod
ON obj_info (owner_kind, owner_id, package, module, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_owner_name
ON obj_info (owner_kind, owner_id, package, module, name, cp_sequence_number, object_id);

CREATE INDEX IF NOT EXISTS obj_info_owner_inst
ON obj_info (owner_kind, owner_id, package, module, name, instantiation, cp_sequence_number, object_id);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod kv_feature_flags;
pub mod kv_objects;
pub mod kv_protocol_configs;
pub mod kv_transactions;
pub mod obj_info;
pub mod obj_versions;
pub mod sum_coin_balances;
pub mod sum_displays;
Expand Down
110 changes: 110 additions & 0 deletions crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, Result};
use diesel_async::RunQueryDsl;
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData, object::Owner};

use crate::{
db,
models::objects::{StoredObjInfo, StoredOwnerKind},
pipeline::{concurrent::Handler, Processor},
schema::obj_info,
};

pub struct ObjInfo;

impl Processor for ObjInfo {
const NAME: &'static str = "obj_info";
type Value = StoredObjInfo;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
let checkpoint_input_objects = checkpoint.checkpoint_input_objects();
let latest_live_output_objects = checkpoint
.latest_live_output_objects()
.into_iter()
.map(|o| (o.id(), o))
.collect::<BTreeMap<_, _>>();
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
for object_id in checkpoint_input_objects.keys() {
if !latest_live_output_objects.contains_key(object_id) {
// If an input object is not in the latest live output objects, it must have been deleted
// or wrapped in this checkpoint. We keep an entry for it in the table.
// This is necessary when we query objects and iterating over them, so that we don't
// include the object in the result if it was deleted.
values.insert(
*object_id,
StoredObjInfo {
object_id: object_id.to_vec(),
cp_sequence_number,
owner_kind: None,
owner_id: None,
package: None,
module: None,
name: None,
instantiation: None,
},
);
}
}
for (object_id, object) in latest_live_output_objects.iter() {
// If an object is newly created/unwrapped in this checkpoint, or if the owner changed,
// we need to insert an entry for it in the table.
let should_insert = match checkpoint_input_objects.get(object_id) {
Some(input_object) => input_object.owner() != object.owner(),
None => true,
};
if should_insert {
let type_ = object.type_();
values.insert(
*object_id,
StoredObjInfo {
object_id: object_id.to_vec(),
cp_sequence_number,
owner_kind: Some(match object.owner() {
Owner::AddressOwner(_) => StoredOwnerKind::Address,
Owner::ObjectOwner(_) => StoredOwnerKind::Object,
Owner::Shared { .. } => StoredOwnerKind::Shared,
Owner::Immutable => StoredOwnerKind::Immutable,
}),

owner_id: match object.owner() {
Owner::AddressOwner(a) => Some(a.to_vec()),
Owner::ObjectOwner(o) => Some(o.to_vec()),
_ => None,
},

package: type_.map(|t| t.address().to_vec()),
module: type_.map(|t| t.module().to_string()),
name: type_.map(|t| t.name().to_string()),
instantiation: type_
.map(|t| bcs::to_bytes(&t.type_params()))
.transpose()
.map_err(|e| {
anyhow!(
"Failed to serialize type parameters for {}: {e}",
object.id().to_canonical_display(/* with_prefix */ true),
)
})?,
},
);
}
}

Ok(values.into_values().collect())
}
}

#[async_trait::async_trait]
impl Handler for ObjInfo {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(obj_info::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
kv_objects::KvObjects, kv_protocol_configs::KvProtocolConfigs, kv_transactions::KvTransactions,
obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, sum_displays::SumDisplays,
sum_obj_types::SumObjTypes, sum_packages::SumPackages,
obj_info::ObjInfo, obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances,
sum_displays::SumDisplays, sum_obj_types::SumObjTypes, sum_packages::SumPackages,
tx_affected_addresses::TxAffectedAddress, tx_affected_objects::TxAffectedObjects,
tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests,
tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
Expand Down Expand Up @@ -435,6 +435,7 @@ pub async fn start_indexer(
indexer.concurrent_pipeline(KvEpochStarts, None).await?;
indexer.concurrent_pipeline(KvObjects, None).await?;
indexer.concurrent_pipeline(KvTransactions, None).await?;
indexer.concurrent_pipeline(ObjInfo, None).await?;
indexer.concurrent_pipeline(ObjVersions, None).await?;
indexer.concurrent_pipeline(TxAffectedAddress, None).await?;
indexer.concurrent_pipeline(TxAffectedObjects, None).await?;
Expand Down
16 changes: 15 additions & 1 deletion crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use sui_field_count::FieldCount;
use sui_types::base_types::ObjectID;

use crate::schema::{
kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, wal_obj_types,
kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances,
wal_obj_types,
};

#[derive(Insertable, Debug, Clone, FieldCount)]
Expand Down Expand Up @@ -126,3 +127,16 @@ where
})
}
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))]
pub struct StoredObjInfo {
pub object_id: Vec<u8>,
pub cp_sequence_number: i64,
pub owner_kind: Option<StoredOwnerKind>,
pub owner_id: Option<Vec<u8>>,
pub package: Option<Vec<u8>>,
pub module: Option<String>,
pub name: Option<String>,
pub instantiation: Option<Vec<u8>>,
}
14 changes: 14 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ diesel::table! {
}
}

diesel::table! {
obj_info (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
package -> Nullable<Bytea>,
module -> Nullable<Text>,
name -> Nullable<Text>,
instantiation -> Nullable<Bytea>,
}
}

diesel::table! {
obj_versions (object_id, object_version) {
object_id -> Bytea,
Expand Down Expand Up @@ -250,6 +263,7 @@ diesel::allow_tables_to_appear_in_same_query!(
kv_objects,
kv_protocol_configs,
kv_transactions,
obj_info,
obj_versions,
sum_coin_balances,
sum_displays,
Expand Down
31 changes: 22 additions & 9 deletions crates/sui-types/src/full_checkpoint_content.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};

use crate::base_types::ObjectRef;
use crate::base_types::{ObjectID, ObjectRef};
use crate::effects::{
IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEffectsAPI, TransactionEvents,
};
Expand Down Expand Up @@ -51,11 +51,24 @@ impl CheckpointData {
eventually_removed_object_refs.into_values().collect()
}

pub fn input_objects(&self) -> Vec<&Object> {
self.transactions
.iter()
.flat_map(|tx| &tx.input_objects)
.collect()
/// Returns all objects that are used as input to the transactions in the checkpoint,
/// and already exist prior to the checkpoint.
pub fn checkpoint_input_objects(&self) -> BTreeMap<ObjectID, &Object> {
let mut output_objects_seen = HashSet::new();
let mut checkpoint_input_objects = BTreeMap::new();
for tx in self.transactions.iter() {
for obj in tx.input_objects.iter() {
let id = obj.id();
if output_objects_seen.contains(&id) || checkpoint_input_objects.contains_key(&id) {
continue;
}
checkpoint_input_objects.insert(id, obj);
}
for obj in tx.output_objects.iter() {
output_objects_seen.insert(obj.id());
}
}
checkpoint_input_objects
}

pub fn all_objects(&self) -> Vec<&Object> {
Expand All @@ -73,7 +86,7 @@ pub struct CheckpointTransaction {
pub transaction: Transaction,
/// The effects produced by executing this transaction
pub effects: TransactionEffects,
/// The events, if any, emitted by this transaciton during execution
/// The events, if any, emitted by this transactions during execution
pub events: Option<TransactionEvents>,
/// The state of all inputs to this transaction as they were prior to execution.
pub input_objects: Vec<Object>,
Expand All @@ -87,7 +100,7 @@ impl CheckpointTransaction {
// Iterator over id and versions for all deleted or wrapped objects
match &self.effects {
TransactionEffects::V1(v1) => Either::Left(
// Effects v1 has delted and wrapped objects versions as the "new" version, not the
// Effects v1 has deleted and wrapped objects versions as the "new" version, not the
// old one that was actually removed. So we need to take these and then look them
// up in the `modified_at_versions`.
// No need to chain unwrapped_then_deleted because these objects must have been wrapped
Expand Down

0 comments on commit cf918de

Please sign in to comment.