Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP add backend messages serialization and frontend messages deserialization #946

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions postgres-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
#![warn(missing_docs, rust_2018_idioms, clippy::all)]

use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use std::io;
use bytes::{Bytes, BufMut, BytesMut};
use std::{io, intrinsics::size_of};
use memchr::memchr;
use std::mem;

pub mod authentication;
pub mod escape;
Expand Down Expand Up @@ -76,3 +78,86 @@ macro_rules! from_usize {

from_usize!(i16);
from_usize!(i32);

pub struct Buffer {
bytes: bytes::Bytes,
idx: usize,
}

impl Buffer {
pub fn new(bytes: bytes::Bytes, idx: usize) -> Self {
Self { bytes, idx }
}
}

impl Buffer {
#[inline]
pub fn slice(&self) -> &[u8] {
&self.bytes[self.idx..]
}

#[inline]
pub fn is_empty(&self) -> bool {
self.slice().is_empty()
}

#[inline]
pub fn read_cstr(&mut self) -> io::Result<Bytes> {
match memchr(0, self.slice()) {
Some(pos) => {
let start = self.idx;
let end = start + pos;
let cstr = self.bytes.slice(start..end);
self.idx = end + 1;
Ok(cstr)
}
None => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF",
)),
}
}

#[inline]
fn read_by_size(&mut self, size: usize, kind: &str) -> io::Result<Bytes> {
let start = self.idx;
let end = start + size + 1;
match self.bytes.get(start..end) {
Some(s) => {
self.idx = end;
Ok(s.into())
},
None => io::Error::new(io::ErrorKind::UnexpectedEOF, format!("Unable to read {}", kind)),
}
}

#[inline]
pub fn read_u16(&mut self) -> io::Result<u16> {
self.read_by_size(mem::size_of::<u16>(), "u16")
}

#[inline]
pub fn read_i32(&mut self) -> io::Result<i32> {
self.read_by_size(mem::size_of::<i32>(), "i32")
}

#[inline]
pub fn read_i16(&mut self) -> io::Result<i32> {
self.read_by_size(mem::size_of::<i16>(), "i16")
}

#[inline]
pub fn read_byten(&mut self, n: usize) -> io::Result<Bytes> {
self.read_by_size(1, &format!("byte{}", n))
}

#[inline]
pub fn

#[inline]
pub fn read_all(&mut self) -> Bytes {
let buf = self.bytes.slice(self.idx..);
self.idx = self.bytes.len();
buf
}
}
250 changes: 249 additions & 1 deletion postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#![allow(missing_docs)]

use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use bytes::{Bytes, BytesMut, BufMut, Buf};
use fallible_iterator::FallibleIterator;
use memchr::memchr;
use std::cmp;
use std::convert::TryInto;
use std::io::{self, Read};
use std::ops::Range;
use std::str;
Expand Down Expand Up @@ -278,6 +279,253 @@ impl Message {
}
}

impl TryInto<Bytes> for Message {
type Error = io::Error;

fn try_into(self) -> Result<Bytes, Self::Error> {
match self {
Message::AuthenticationCleartextPassword => {
let mut buf = BytesMut::with_capacity(9);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(8i32);
buf.put_i32(3i32);
Ok(Bytes::from(buf))
},
Message::AuthenticationGss => {
let mut buf = BytesMut::with_capacity(9);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(8i32);
buf.put_i32(7i32);
Ok(Bytes::from(buf))
},
Message::AuthenticationGssContinue(body) => {
let len = body.0.len();
let mut buf = BytesMut::with_capacity(len+1+4+4);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len as i32);
buf.put_i32(8i32);
buf.put_slice(body.data());
Ok(Bytes::from(buf))
},
Message::AuthenticationKerberosV5 => {
let len = 8i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(8i32);
buf.put_i32(2i32);
Ok(Bytes::from(buf))
},
Message::AuthenticationMd5Password(body) => {
let len = 12i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len);
buf.put_i32(5i32);
buf.put_slice(&body.salt());
Ok(Bytes::from(buf))
},
Message::AuthenticationOk => {
let len = 8i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len);
buf.put_i32(0i32);
Ok(Bytes::from(buf))
},
Message::AuthenticationSasl(body) => {
let len = body.0.len() + 1 + 4;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len as i32);
buf.put_i32(10i32);
buf.put_slice(body.0.chunk());
Ok(Bytes::from(buf))
},
Message::AuthenticationSaslContinue(body) => {
let len = body.0.len() + 1 + 4 + 4;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len as i32);
buf.put_i32(11i32);
buf.put_slice(body.data());
Ok(Bytes::from(buf))
},
Message::AuthenticationSaslFinal(body) => {
let len = body.0.len() + 1 + 4 + 4;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(AUTHENTICATION_TAG);
buf.put_i32(len as i32);
buf.put_i32(12i32);
buf.put_slice(body.data());
Ok(Bytes::from(buf))
},
Message::AuthenticationScmCredential => {
let len = 8i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_i32(len);
buf.put_i32(6i32);
Ok(Bytes::from(buf))
},
Message::AuthenticationSspi => {
let len = 8i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_i32(len);
buf.put_i32(9i32);
Ok(Bytes::from(buf))
},
Message::BackendKeyData(body) => {
let len = 12i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(BACKEND_KEY_DATA_TAG);
buf.put_i32(len);
buf.put_i32(body.process_id());
buf.put_i32(body.secret_key());
Ok(Bytes::from(buf))
},
Message::BindComplete => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(BIND_COMPLETE_TAG);
buf.put_i32(4i32);
Ok(Bytes::from(buf))
},
Message::CloseComplete => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(CLOSE_COMPLETE_TAG);
buf.put_i32(4i32);
Ok(Bytes::from(buf))
},
Message::CommandComplete(body) => {
let len = body.tag()?.len() + 1 + 4;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(COMMAND_COMPLETE_TAG);
buf.put_i32(len as i32);
buf.put_slice(body.tag()?.as_bytes());
Ok(Bytes::from(buf))
},
Message::CopyData(body) => {
let len = body.data().len() + 1 + 4;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(COPY_DATA_TAG);
buf.put_i32(len as i32);
buf.put_slice(body.data());
Ok(Bytes::from(buf))
},
Message::CopyDone => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(COPY_DONE_TAG);
buf.put_i32(len);
Ok(Bytes::from(buf))
},
Message::CopyInResponse(body) => {
let len = body.len + 1 + 4 + 1 + 2 + 2;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(COPY_IN_RESPONSE_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::CopyOutResponse(body) => {
let len = body.len + 1 + 4 + 1 + 2 + 2;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(COPY_OUT_RESPONSE_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::DataRow(body) => {
let len = body.len + 1 + 4 + 2 + 4;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(DATA_ROW_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::EmptyQueryResponse => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(EMPTY_QUERY_RESPONSE_TAG);
buf.put_i32(len);
Ok(Bytes::from(buf))
},
Message::ErrorResponse(body) => {
let len = body.storage.len() + 1;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(ERROR_RESPONSE_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::NoData => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(NO_DATA_TAG);
buf.put_i32(len);
Ok(Bytes::from(buf))
},
Message::NoticeResponse(body) => {
let len = body.storage.len() + 1;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(NOTICE_RESPONSE_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::NotificationResponse(body) => {
Ok(body.message)
},
Message::ParameterDescription(body) => {
let len = body.len + 1;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(PARAMETER_DESCRIPTION_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
},
Message::ParameterStatus(body) => {
let len = body.value.len() + 1;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(PARAMETER_STATUS_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.value);
Ok(Bytes::from(buf))
},
Message::ParseComplete => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(PARSE_COMPLETE_TAG);
buf.put_i32(len);
Ok(Bytes::from(buf))
},
Message::PortalSuspended => {
let len = 4i32;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(PORTAL_SUSPENDED_TAG);
buf.put_i32(len);
Ok(Bytes::from(buf))
},
Message::ReadyForQuery(body) => {
let len = 5;
let mut buf = BytesMut::with_capacity(len);
buf.put_u8(READY_FOR_QUERY_TAG);
buf.put_i32(len as i32);
buf.put_u8(body.status);
Ok(Bytes::from(buf))
},
Message::RowDescription(body) => {
let len = body.len;
let mut buf = BytesMut::with_capacity(len as usize);
buf.put_u8(ROW_DESCRIPTION_TAG);
buf.put_i32(len as i32);
buf.put_slice(&body.storage);
Ok(Bytes::from(buf))
}
}
}
}

struct Buffer {
bytes: Bytes,
idx: usize,
Expand Down
Loading