Skip to content

Commit

Permalink
update rust to version 1.82 (#7893)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 5b44e7f3d7b4664c230d3094870e3b921145c0db
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Dec 19, 2024
1 parent fa09e1c commit d039b7e
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "pathway"
version = "0.16.2"
edition = "2021"
publish = false
rust-version = "1.79"
rust-version = "1.82"
license = "BUSL-1.1"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.79"
channel = "1.82"
13 changes: 6 additions & 7 deletions src/connectors/data_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,14 +1357,13 @@ impl Parser for DebeziumMessageParser {
(key_and_value[0].to_string(), key_and_value[1].to_string())
}
KeyValue((k, v)) => {
let key = match k {
Some(bytes) => prepare_plaintext_string(bytes)?,
None => {
if self.key_field_names.is_some() {
return Err(ParseError::EmptyKafkaPayload.into());
}
DEBEZIUM_EMPTY_KEY_PAYLOAD.to_string()
let key = if let Some(bytes) = k {
prepare_plaintext_string(bytes)?
} else {
if self.key_field_names.is_some() {
return Err(ParseError::EmptyKafkaPayload.into());
}
DEBEZIUM_EMPTY_KEY_PAYLOAD.to_string()
};
let value = match v {
Some(bytes) => prepare_plaintext_string(bytes)?,
Expand Down
33 changes: 15 additions & 18 deletions src/connectors/data_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,7 @@ impl Writer for DeltaTableWriter {

pub enum ObjectDownloader {
Local,
S3(S3Bucket),
S3(Box<S3Bucket>),
}

impl ObjectDownloader {
Expand Down Expand Up @@ -2061,26 +2061,23 @@ impl DeltaTableReader {

fn read_next_row_native(&mut self, is_polling_enabled: bool) -> Result<ParquetRow, ReadError> {
loop {
match &mut self.reader {
Some(ref mut reader) => {
match reader.next() {
Some(Ok(row)) => return Ok(row),
Some(Err(parquet_err)) => return Err(ReadError::Parquet(parquet_err)),
None => self.reader = None,
};
}
None => {
if let Some(ref mut reader) = &mut self.reader {
match reader.next() {
Some(Ok(row)) => return Ok(row),
Some(Err(parquet_err)) => return Err(ReadError::Parquet(parquet_err)),
None => self.reader = None,
};
} else {
if self.parquet_files_queue.is_empty() {
self.upgrade_table_version(is_polling_enabled)?;
if self.parquet_files_queue.is_empty() {
self.upgrade_table_version(is_polling_enabled)?;
if self.parquet_files_queue.is_empty() {
return Err(ReadError::NoObjectsToRead);
}
return Err(ReadError::NoObjectsToRead);
}
let next_action = self.parquet_files_queue.pop_front().unwrap();
let local_object = self.object_downloader.download_object(&next_action.path)?;
self.current_event_type = next_action.action_type;
self.reader = Some(DeltaLakeParquetReader::try_from(local_object)?.into_iter());
}
let next_action = self.parquet_files_queue.pop_front().unwrap();
let local_object = self.object_downloader.download_object(&next_action.path)?;
self.current_event_type = next_action.action_type;
self.reader = Some(DeltaLakeParquetReader::try_from(local_object)?.into_iter());
}
}
}
Expand Down
44 changes: 22 additions & 22 deletions src/persistence/cached_object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,36 @@ impl MetadataEvent {
/// The `CachedObjectStorage` is a data structure that provides
/// the following interface:
/// 1. Upsert an object denoted by its URI, value, and metadata. Each upsert
/// creates a new version of the data structure.
/// creates a new version of the data structure.
/// 2. Remove an object by its URI. Each removal creates a new version of the
/// data structure.
/// data structure.
/// 3. Lookup: find an object by its URI, check if an object with the given
/// URI is present, get the metadata of the object, etc. The lookups are
/// performed on the latest state of the data structure.
/// URI is present, get the metadata of the object, etc. The lookups are
/// performed on the latest state of the data structure.
/// 4. Rewind to the given version of the data structure in part. The rewind
/// alters the state of the data structure: all versions that follow
/// the given one are removed and are no longer accessible. All versions that are
/// obsolete after the rewind, e.g., those that don't correspond to the latest state of
/// any URI, are also removed.
/// alters the state of the data structure: all versions that follow
/// the given one are removed and are no longer accessible. All versions that are
/// obsolete after the rewind, e.g., those that don't correspond to the latest state of
/// any URI, are also removed.
///
/// The versions are numbered with consecutive integers from 1 onwards. Rewind to
/// version 0 corresponds to the cleanup of the storage.
///
/// The implementation is as follows:
/// - There are two types of events: object addition and object removal.
/// These events are stored both in the selected durable storage and in several
/// in-memory indexes, denoting events by version; sorted event sequences by the
/// object URI and the snapshot - the actual, version-unaware state of the data
/// structure.
/// - When the data structure starts, it reads the sequence of events and
/// constructs the mappings described above.
/// - When a rewind takes place, the versions that need to be deleted are detected
/// and undone one by one, starting from the latest. Note that these events are
/// also removed from the durable storage.
/// - When a lookup takes place, the snapshot is used.
/// - When an upsert or removal takes place, a new version is created. An event
/// corresponding to this version is added to the durable storage and to the local
/// event indexes. It is also reflected in a locally stored snapshot.
/// * There are two types of events: object addition and object removal.
/// These events are stored both in the selected durable storage and in several
/// in-memory indexes, denoting events by version; sorted event sequences by the
/// object URI and the snapshot - the actual, version-unaware state of the data
/// structure.
/// * When the data structure starts, it reads the sequence of events and
/// constructs the mappings described above.
/// * When a rewind takes place, the versions that need to be deleted are detected
/// and undone one by one, starting from the latest. Note that these events are
/// also removed from the durable storage.
/// * When a lookup takes place, the snapshot is used.
/// * When an upsert or removal takes place, a new version is created. An event
/// corresponding to this version is added to the durable storage and to the local
/// event indexes. It is also reflected in a locally stored snapshot.
#[derive(Debug)]
pub struct CachedObjectStorage {
backend: Box<dyn PersistenceBackend>,
Expand Down
5 changes: 4 additions & 1 deletion src/persistence/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub type ConnectorWorkerPair = (PersistentId, usize);
#[derive(Debug, Clone)]
pub enum PersistentStorageConfig {
Filesystem(PathBuf),
S3 { bucket: S3Bucket, root_path: String },
S3 {
bucket: Box<S3Bucket>,
root_path: String,
},
Mock(HashMap<ConnectorWorkerPair, Vec<Event>>),
}

Expand Down
27 changes: 14 additions & 13 deletions src/persistence/input_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ impl InputSnapshotReader {

fn next_event(&mut self) -> Result<Event, Error> {
loop {
match &mut self.reader {
Some(reader) => match deserialize_from(reader) {
if let Some(reader) = &mut self.reader {
match deserialize_from(reader) {
Ok(entry) => return Ok(entry),
Err(e) => match *e {
BincodeError::Io(e) => {
Expand All @@ -164,19 +164,20 @@ impl InputSnapshotReader {
}
_ => return Err(Error::Bincode(*e)),
},
},
None => {
if self.next_chunk_idx >= self.chunk_ids.len() {
break;
}
let next_chunk_key = format!("{}", self.chunk_ids[self.next_chunk_idx]);
info!("Snapshot reader proceeds to the chunk {next_chunk_key} after {} snapshot entries", self.entries_read);
let contents = self.backend.get_value(&next_chunk_key)?;
let cursor = Cursor::new(contents);
self.reader = Some(BufReader::new(cursor));
self.next_chunk_idx += 1;
}
}
if self.next_chunk_idx >= self.chunk_ids.len() {
break;
}
let next_chunk_key = format!("{}", self.chunk_ids[self.next_chunk_idx]);
info!(
"Snapshot reader proceeds to the chunk {next_chunk_key} after {} snapshot entries",
self.entries_read
);
let contents = self.backend.get_value(&next_chunk_key)?;
let cursor = Cursor::new(contents);
self.reader = Some(BufReader::new(cursor));
self.next_chunk_idx += 1;
}
Ok(Event::Finished)
}
Expand Down
17 changes: 8 additions & 9 deletions src/python_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4329,13 +4329,12 @@ impl DataStorage {
}

fn build_csv_parser_settings(&self, py: pyo3::Python) -> CsvReaderBuilder {
match &self.csv_parser_settings {
Some(parser_settings) => parser_settings.borrow(py).build_csv_reader_builder(),
None => {
let mut builder = CsvReaderBuilder::new();
builder.has_headers(false);
builder
}
if let Some(parser_settings) = &self.csv_parser_settings {
parser_settings.borrow(py).build_csv_reader_builder()
} else {
let mut builder = CsvReaderBuilder::new();
builder.has_headers(false);
builder
}
}

Expand Down Expand Up @@ -4549,7 +4548,7 @@ impl DataStorage {

fn object_downloader(&self, py: pyo3::Python) -> PyResult<ObjectDownloader> {
if self.aws_s3_settings.is_some() {
Ok(ObjectDownloader::S3(self.s3_bucket(py)?))
Ok(ObjectDownloader::S3(Box::new(self.s3_bucket(py)?)))
} else {
Ok(ObjectDownloader::Local)
}
Expand Down Expand Up @@ -4643,7 +4642,7 @@ impl DataStorage {
let bucket = self.s3_bucket(py)?;
let path = self.path()?;
Ok(PersistentStorageConfig::S3 {
bucket,
bucket: Box::new(bucket),
root_path: path.into(),
})
}
Expand Down

0 comments on commit d039b7e

Please sign in to comment.