Skip to content

Commit

Permalink
fix all sql to point on schema solana_tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Nov 9, 2023
1 parent 10b20fa commit bc7a5ce
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/solana-token-tracker-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
RootCmd.Flags().Uint64("stop-block", 0, "stop block number (0 means no stop block)")

// Manifest
RootCmd.Flags().String("output-module-type", "proto:hivemapper.types.v1.Output", "Expected output module type")
RootCmd.Flags().String("output-module-type", "proto:solana_token_tracker.types.v1.Output", "Expected output module type")
}

func rootRun(cmd *cobra.Command, args []string) error {
Expand Down
20 changes: 10 additions & 10 deletions data/psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Psql) Init() error {
return nil
}
func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err error) {
row := p.tx.QueryRow("INSERT INTO hivemapper.blocks (number, hash, timestamp) VALUES ($1, $2, $3) RETURNING id", clock.Number, clock.Id, clock.Timestamp.AsTime())
row := p.tx.QueryRow("INSERT INTO solana_tokens.blocks (number, hash, timestamp) VALUES ($1, $2, $3) RETURNING id", clock.Number, clock.Id, clock.Timestamp.AsTime())
err = row.Err()
if err != nil {
return 0, fmt.Errorf("inserting clock: %w", err)
Expand All @@ -66,7 +66,7 @@ func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err erro

func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTransactionID int64, err error) {
//todo: create a transaction cache
rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash)
rows, err := p.tx.Query("SELECT id FROM solana_tokens.transactions WHERE hash = $1", transactionHash)
p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash))
if err != nil {
return 0, fmt.Errorf("selecting transaction: %w", err)
Expand All @@ -78,7 +78,7 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra
return
}

row := p.tx.QueryRow("INSERT INTO hivemapper.transactions (hash, block_id) VALUES ($1, $2) RETURNING id", transactionHash, dbBlockID)
row := p.tx.QueryRow("INSERT INTO solana_tokens.transactions (hash, block_id) VALUES ($1, $2) RETURNING id", transactionHash, dbBlockID)
err = row.Err()
if err != nil {
return 0, fmt.Errorf("inserting transaction: %w", err)
Expand All @@ -94,7 +94,7 @@ func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []*
if err != nil {
return fmt.Errorf("handling transaction: %w", err)
}
_, err = p.tx.Exec("INSERT INTO hivemapper.derived_addresses (transaction_id, address, derivedAddress) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", dbTransactionID, initializedAccount.Owner, initializedAccount.Account)
_, err = p.tx.Exec("INSERT INTO solana_tokens.derived_addresses (transaction_id, address, derivedAddress) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", dbTransactionID, initializedAccount.Owner, initializedAccount.Account)
if err != nil {
return fmt.Errorf("trx_hash: %d inserting derived_addresses: %w", dbBlockID, err)
}
Expand All @@ -106,7 +106,7 @@ var NotFound = errors.New("Not found")

func (p *Psql) resolveAddress(derivedAddress string) (string, error) {
resolvedAddress := ""
rows, err := p.tx.Query("SELECT address FROM hivemapper.derived_addresses WHERE derivedAddress = $1", derivedAddress)
rows, err := p.tx.Query("SELECT address FROM solana_tokens.derived_addresses WHERE derivedAddress = $1", derivedAddress)
if err != nil {
return "", fmt.Errorf("selecting derived_addresses: %w", err)
}
Expand All @@ -127,7 +127,7 @@ func (p *Psql) HandleTransfers(dbBlockID int64, transfers []*pb.Transfer) error
return fmt.Errorf("inserting transaction: %w", err)
}
//{"error": "handle BlockScopedData message: rollback transaction: rolling back transaction: driver: bad connection: while handling err handle transfers: inserting transfer: pq: unexpected Parse response 'C'"}
_, err = p.tx.Exec("INSERT INTO hivemapper.transfers (transaction_id, from_address, to_address, amount) VALUES ($1, $2, $3, $4)", dbTransactionID, transfer.From, transfer.To, transfer.Amount)
_, err = p.tx.Exec("INSERT INTO solana_tokens.transfers (transaction_id, from_address, to_address, amount) VALUES ($1, $2, $3, $4)", dbTransactionID, transfer.From, transfer.To, transfer.Amount)
if err != nil {
fmt.Println("processing transfer: ", transfer.From, transfer.To, transfer.Amount, transfer.TrxHash)
return fmt.Errorf("inserting transfer: %w", err)
Expand All @@ -137,7 +137,7 @@ func (p *Psql) HandleTransfers(dbBlockID int64, transfers []*pb.Transfer) error
}

func (p *Psql) insertMint(dbTransactionID int64, mint *pb.Mint) (dbMintID int64, err error) {
row := p.tx.QueryRow("INSERT INTO hivemapper.mints (transaction_id, to_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, mint.To, mint.Amount)
row := p.tx.QueryRow("INSERT INTO solana_tokens.mints (transaction_id, to_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, mint.To, mint.Amount)
err = row.Err()
if err != nil {
return 0, fmt.Errorf("inserting mint: %w", err)
Expand All @@ -163,7 +163,7 @@ func (p *Psql) HandleMints(dbBlockID int64, mints []*pb.Mint) error {
}

func (p *Psql) insertBurns(dbTransactionID int64, burn *pb.Burn) (dbMintID int64, err error) {
row := p.tx.QueryRow("INSERT INTO hivemapper.burns (transaction_id, from_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, burn.From, burn.Amount)
row := p.tx.QueryRow("INSERT INTO solana_tokens.burns (transaction_id, from_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, burn.From, burn.Amount)
err = row.Err()
if err != nil {
return 0, fmt.Errorf("inserting burn: %w", err)
Expand All @@ -189,15 +189,15 @@ func (p *Psql) HandleBurns(dbBlockID int64, burns []*pb.Burn) error {
}

func (p *Psql) StoreCursor(cursor *sink.Cursor) error {
_, err := p.tx.Exec("INSERT INTO hivemapper.cursor (name, cursor) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET cursor = $2", "hivemapper", cursor.String())
_, err := p.tx.Exec("INSERT INTO solana_tokens.cursor (name, cursor) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET cursor = $2", "token_tracker", cursor.String())
if err != nil {
return fmt.Errorf("inserting cursor: %w", err)
}
return nil
}

func (p *Psql) FetchCursor() (*sink.Cursor, error) {
rows, err := p.db.Query("SELECT cursor FROM hivemapper.cursor WHERE name = $1", "hivemapper")
rows, err := p.db.Query("SELECT cursor FROM solana_tokens.cursor WHERE name = $1", "token_tracker")
if err != nil {
return nil, fmt.Errorf("selecting cursor: %w", err)
}
Expand Down

0 comments on commit bc7a5ce

Please sign in to comment.