Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for COPY IN protocol #72

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Conversation

flimzy
Copy link
Contributor

@flimzy flimzy commented Aug 9, 2024

This PR is still a bit rough. It does not have tests for corner cases (most important: when client sends CopyFail), and I'm not entirely sure that the structure is as simple as it can be.

But I'd love to get some feedback on the approach.

Two main API changes:

  1. CopyIn added to the DataWriter type.
  2. The Execute method in the PortalCache interface needs an additional argument. I've implemented this in a backward-compatible way, though.

Still to do:

  • Clean up commit history
  • What error code should the server return when the client sends a CopyFail?
  • Test for CopyFail from client
  • Test for Text format
  • Test for mixed column formats
  • Should we provide any parsing of the copy stream? IMO, probably not by default (I don't need to parse it for my own needs), but maybe as helper utilities, in future PRs, if needed by someone?

Note that this does not implement CopyOutResponse or CopyBothResponse. I don't (yet) need either of these. But with the groundwork in this PR, they should both be easier to accomplish if/when needed.

Copy link
Owner

@jeroenrinzema jeroenrinzema left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening a PR! Left a few comments. Would like to explore if we could expose the copy implementation as a io interface.

writer.go Outdated Show resolved Hide resolved
writer.go Outdated Show resolved Hide resolved
command.go Outdated Show resolved Hide resolved
command.go Outdated
func (srv *Server) copyDataFn(reader *buffer.Reader, writer *buffer.Writer) CopyDataFn {
return func(ctx context.Context) ([]byte, error) {
var results []byte
err := srv.commandLoop(ctx, reader, writer, srv.handleCopyInCommand(func(r []byte) { results = r }))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to run this within a goroutine and write back the results to a io.Writer? This allows the client not having to keep the entire message in memory. Errors thrown when reading data received from the client could be passed on to the io.Writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a bit cumbersome. I'm not sure that an io.Writer or goroutine would help much, though.

We're not actually reading anything extra into memory as it is. The results value just gets the contents of reader.Msg, which is already completely in memory.

The reason for this awkward callback is to facilitate a common function signature for handleCommand and handleCopyInCommand, so that they can both be passed to commandLoop (formerly part of consumeCommands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored this bit now, to be quite a bit simpler, I think. I'm curious to hear your thoughts on how it looks now.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting approach, it is indeed a bit simpler. Got a few ideas I was thinking about which I would play with tonight to check if we could avoid having to copy the buffer.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we pass the content length to the handler function. This way we could stream the incoming bytes to the returned io.Reader and terminate the io.Reader when reaching the total byte length. It might make sense to take over the buffered reader in case of a copy command in order to avoid the ErrMessageSizeExceeded error being thrown for larger payloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which content length are you referring to? For the individual CopyData packets? Or the overall length of the copy data? I don't think the latter is communicated to the server (or even known by the client in many cases). But maybe I'm overlooking something obvious.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to test this at the moment, typed messages within the psql wire protocol have a specific length. We need to check whether the message length is send for the entire copydata message or whether the messages are split in chunks. The message length is fetched over here: https://github.com/jeroenrinzema/psql-wire/blob/main/pkg/buffer/reader.go#L125-L131

Copy link
Contributor Author

@flimzy flimzy Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't quite follow.

A single COPY IN is typically comprised of multile CopyData packets.

I don't think there's any way to know the overall length of the entire stream of data to be copied in, until the CopyDone packet is received.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late response I have been sick over the past week. Good point, I missed that within the PSQL wire protocol implementation. The way how we could do it is as following:

type copyDataReader struct {
	reader *buffer.Reader
	size   int
}

func (r *copyDataReader) Read(p []byte) (int, error) {
	if r.size == 0 {
		_, err := r.reader.ReadType()
		if err != nil {
			return -1, err
		}

		r.size, err = r.reader.ReadMsgSize()
		if err != nil {
			return -1, err
		}
	}

	n, err := io.ReadFull(r.reader.Buffer, p)
	r.size -= n
	return n, err
}

We need to expose a bit more internal logic within the buffered reader but this would allow us to bypass copying the buffer twice. The reader directly interacts with the wire protocol and passes the given buffer by the reader to io.ReadFull. I have just opened a PR exposing the methods used in the example above: #74.

@flimzy flimzy force-pushed the copyIn branch 3 times, most recently from 0af1145 to 3325a8a Compare August 12, 2024 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants