Skip to content

Commit

Permalink
fix: Implement context-aware APIs for better concurrency and robustness
Browse files Browse the repository at this point in the history
closes everFinance#56

Here is the commit message:

**Summary**

This commit adds support for context parameters to various functions across the codebase, enabling cancellation of long-running operations and improving error handling.

**Key Changes**

- Added context support to various functions, such as `GetTransactionData`, `BroadcastData`, `Send*`, and `WarpTransfer`, allowing for cancellation and improved error handling.
- Updated multiple files to use the `context` package and its functions.
- Modified several functions to take a `context.Context` as an argument, enabling cancellation and better error handling.
  • Loading branch information
Laisky committed May 20, 2024
1 parent be9761b commit 1b5a530
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 80 deletions.
30 changes: 18 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goar

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -372,38 +373,38 @@ func (c *Client) GetTransactionAnchor() (anchor string, err error) {
return
}

func (c *Client) SubmitTransaction(tx *types.Transaction) (status string, code int, err error) {
func (c *Client) SubmitTransaction(ctx context.Context, tx *types.Transaction) (status string, code int, err error) {
by, err := json.Marshal(tx)
if err != nil {
return
}

body, statusCode, err := c.httpPost("tx", by)
body, statusCode, err := c.httpPost(ctx, "tx", by)
status = string(body)
code = statusCode
return
}

func (c *Client) SubmitChunks(gc *types.GetChunk) (status string, code int, err error) {
func (c *Client) SubmitChunks(ctx context.Context, gc *types.GetChunk) (status string, code int, err error) {
byteGc, err := gc.Marshal()
if err != nil {
return
}

var body []byte
body, code, err = c.httpPost("chunk", byteGc)
body, code, err = c.httpPost(ctx, "chunk", byteGc)
status = string(body)
return
}

// Arql is Deprecated, recommended to use GraphQL
func (c *Client) Arql(arql string) (ids []string, err error) {
body, _, err := c.httpPost("arql", []byte(arql))
func (c *Client) Arql(ctx context.Context, arql string) (ids []string, err error) {
body, _, err := c.httpPost(ctx, "arql", []byte(arql))
err = json.Unmarshal(body, &ids)
return
}

func (c *Client) GraphQL(query string) ([]byte, error) {
func (c *Client) GraphQL(ctx context.Context, query string) ([]byte, error) {
// generate query
graQuery := struct {
Query string `json:"query"`
Expand All @@ -414,7 +415,7 @@ func (c *Client) GraphQL(query string) ([]byte, error) {
}

// query from http client
data, statusCode, err := c.httpPost("graphql", byQuery)
data, statusCode, err := c.httpPost(ctx, "graphql", byQuery)
if statusCode == 429 {
return nil, ErrRequestLimit
}
Expand Down Expand Up @@ -529,15 +530,20 @@ func (c *Client) httpGet(_path string) (body []byte, statusCode int, err error)
return
}

func (c *Client) httpPost(_path string, payload []byte) (body []byte, statusCode int, err error) {
func (c *Client) httpPost(ctx context.Context, _path string, payload []byte) (body []byte, statusCode int, err error) {
u, err := url.Parse(c.url)
if err != nil {
return
}

u.Path = path.Join(u.Path, _path)

resp, err := c.client.Post(u.String(), "application/json", bytes.NewReader(payload))
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(payload))
if err != nil {
return
}

resp, err := c.client.Do(req)
if err != nil {
return
}
Expand Down Expand Up @@ -1029,7 +1035,7 @@ func (c *Client) DataSyncRecord(endOffset string, intervalsNum int) ([]string, e
return result, nil
}

func (c *Client) SubmitToWarp(tx *types.Transaction) ([]byte, error) {
func (c *Client) SubmitToWarp(ctx context.Context, tx *types.Transaction) ([]byte, error) {
by, err := json.Marshal(tx)
if err != nil {
return nil, err
Expand All @@ -1040,7 +1046,7 @@ func (c *Client) SubmitToWarp(tx *types.Transaction) ([]byte, error) {
}

u.Path = path.Join(u.Path, "/gateway/sequencer/register")
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(by))
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(by))
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions client_broadcast.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package goar

import (
"context"
"errors"
"fmt"

"github.com/everFinance/goar/types"
)

func (c *Client) BroadcastData(txId string, data []byte, numOfNodes int64, peers ...string) error {
func (c *Client) BroadcastData(ctx context.Context, txId string, data []byte, numOfNodes int64, peers ...string) error {
var err error
if len(peers) == 0 {
peers, err = c.GetPeers()
Expand All @@ -24,7 +26,7 @@ func (c *Client) BroadcastData(txId string, data []byte, numOfNodes int64, peers
continue
}

if err = uploader.Once(); err != nil {
if err = uploader.Once(ctx); err != nil {
continue
}

Expand Down
11 changes: 7 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package goar

import (
"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"github.com/stretchr/testify/assert"
"context"
"os"
"strconv"
"testing"

"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"github.com/stretchr/testify/assert"
)

// func TestGetTransactionByID(t *testing.T) {
Expand Down Expand Up @@ -219,12 +221,13 @@ func Test_GetTxDataFromPeers(t *testing.T) {
}

func TestClient_BroadcastData(t *testing.T) {
ctx := context.Background()
cli := NewClient("https://arweave.net")
txId := "J5FY1Ovd6JJ49WFHfCf-1wDM1TbaPSdKnGIB_8ePErE"
data, err := cli.GetTransactionData(txId, "json")
assert.NoError(t, err)

err = cli.BroadcastData(txId, data, 20)
err = cli.BroadcastData(ctx, txId, data, 20)
assert.NoError(t, err)
}

Expand Down
7 changes: 5 additions & 2 deletions example/api_example_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package example

import (
"context"
"testing"

"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"testing"

"github.com/everFinance/goar"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -63,6 +65,7 @@ func Test_Arq1(t *testing.T) {
}

func Test_Arq(t *testing.T) {
ctx := context.Background()
arqStr := `{
"op": "and",
"expr1": {
Expand All @@ -79,7 +82,7 @@ func Test_Arq(t *testing.T) {
// create client
arNode := "https://arweave.net"
c := goar.NewClient(arNode)
ids, err := c.Arql(arqStr)
ids, err := c.Arql(ctx, arqStr)
t.Log(len(ids))
assert.NoError(t, err)
sstr := make([]string, 0)
Expand Down
14 changes: 9 additions & 5 deletions example/chunks_tx_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package example

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
Expand Down Expand Up @@ -71,6 +72,7 @@ func assemblyDataTx(bigData []byte, wallet *goar.Wallet, tags []types.Tag) (*typ

// test upload post big size data by chunks
func Test_PostBigDataByChunks(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/2mbFile.pdf"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -83,11 +85,12 @@ func Test_PostBigDataByChunks(t *testing.T) {
// uploader Transaction
uploader, err := goar.CreateUploader(wallet.Client, tx, nil)
assert.NoError(t, err)
assert.NoError(t, uploader.Once())
assert.NoError(t, uploader.Once(ctx))
}

// test retry upload(断点重传) post big size data by tx id
func Test_RetryUploadDataByTxId(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/3mPhoto.jpg"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -100,7 +103,7 @@ func Test_RetryUploadDataByTxId(t *testing.T) {

// 1. post this tx without data
tx.Data = ""
body, status, err := wallet.Client.SubmitTransaction(tx)
body, status, err := wallet.Client.SubmitTransaction(ctx, tx)
assert.NoError(t, err)
t.Logf("post tx without data; body: %s, status: %d", string(body), status)

Expand All @@ -124,11 +127,12 @@ func Test_RetryUploadDataByTxId(t *testing.T) {
// get uploader by txId and post big data by chunks
uploader, err := goar.CreateUploader(wallet.Client, tx.ID, bigData)
assert.NoError(t, err)
assert.NoError(t, uploader.Once())
assert.NoError(t, uploader.Once(ctx))
}

// test continue upload(断点续传) big size data by last time uploader
func Test_ContinueUploadDataByLastUploader(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/1.8mPhoto.jpg"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -143,7 +147,7 @@ func Test_ContinueUploadDataByLastUploader(t *testing.T) {
assert.NoError(t, err)
// only upload 2 chunks to ar chain
for !uploader.IsComplete() && uploader.ChunkIndex <= 2 {
err := uploader.UploadChunk()
err := uploader.UploadChunk(ctx)
assert.NoError(t, err)
}

Expand All @@ -165,7 +169,7 @@ func Test_ContinueUploadDataByLastUploader(t *testing.T) {
// new uploader object by last time uploader
newUploader, err := goar.CreateUploader(wallet.Client, lastUploader.FormatSerializedUploader(), bigData)
assert.NoError(t, err)
assert.NoError(t, newUploader.Once())
assert.NoError(t, newUploader.Once(ctx))

// end remove jsonUploaderFile.json file
_ = os.Remove("./jsonUploaderFile.json")
Expand Down
7 changes: 5 additions & 2 deletions example/local_data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package example

import (
"context"
"os"
"testing"

Expand All @@ -10,6 +11,7 @@ import (
)

func Test_SendData(t *testing.T) {
ctx := context.Background()
arNode := "https://arweave.net"
w, err := goar.NewWalletFromPath("./wallet/account1.json", arNode) // your wallet private key
assert.NoError(t, err)
Expand All @@ -22,7 +24,7 @@ func Test_SendData(t *testing.T) {
{Name: "xxxx", Value: "sssss"},
{Name: "yyyyyy", Value: "kkkkkk"},
}
tx, err := w.SendDataSpeedUp(data, tags, 10)
tx, err := w.SendDataSpeedUp(ctx, data, tags, 10)
assert.NoError(t, err)
t.Logf("tx hash: %s", tx.ID)
}
Expand Down Expand Up @@ -61,6 +63,7 @@ func TestConcurrentDownloadStream(t *testing.T) {
}

func TestSendDataStream(t *testing.T) {
ctx := context.Background()
arNode := "https://arweave.net"
w, err := goar.NewWalletFromPath("./testKey.json", arNode) // your wallet private key
assert.NoError(t, err)
Expand All @@ -73,7 +76,7 @@ func TestSendDataStream(t *testing.T) {
{Name: "Content-Type", Value: "img/jpeg"},
{Name: "test", Value: "kevin-test"},
}
tx, err := w.SendDataStreamSpeedUp(data, tags, 10)
tx, err := w.SendDataStreamSpeedUp(ctx, data, tags, 10)
assert.NoError(t, err)
t.Log(tx.ID)
// test arId: k5IgHLTag_3bB6Sp5tTUhrFrPPvU5MjevV468dfxNKk
Expand Down
16 changes: 8 additions & 8 deletions types/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ const (

// Errors from /chunk we should never try and continue on.
var FATAL_CHUNK_UPLOAD_ERRORS = map[string]struct{}{
"{\"error\":\"disk_full\"}": struct{}{},
"{\"error\":\"invalid_json\"}": struct{}{},
"{\"error\":\"chunk_too_big\"}": struct{}{},
"{\"error\":\"data_path_too_big\"}": struct{}{},
"{\"error\":\"offset_too_big\"}": struct{}{},
"{\"error\":\"data_size_too_big\"}": struct{}{},
"{\"error\":\"chunk_proof_ratio_not_attractive\"}": struct{}{},
"{\"error\":\"invalid_proof\"}": struct{}{},
"{\"error\":\"disk_full\"}": {},
"{\"error\":\"invalid_json\"}": {},
"{\"error\":\"chunk_too_big\"}": {},
"{\"error\":\"data_path_too_big\"}": {},
"{\"error\":\"offset_too_big\"}": {},
"{\"error\":\"data_size_too_big\"}": {},
"{\"error\":\"chunk_proof_ratio_not_attractive\"}": {},
"{\"error\":\"invalid_proof\"}": {},
}

// about bundle
Expand Down
2 changes: 1 addition & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ type BundlrResp struct {
N string `json:"n"`
Public string `json:"public"`
Block int64 `json:"block"`
ValidatorSignatures []string `json:"validatorSignatures"`
ValidatorSignatures []string `json:"validatorSignatures"`
}
Loading

0 comments on commit 1b5a530

Please sign in to comment.