Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arbitrary message size #457

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions litrpc/lndcrpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ type LndcRpcClient struct {
requestNonce uint64
requestNonceMtx sync.Mutex
responseChannelMtx sync.Mutex
responseChannels map[uint64]chan lnutil.RemoteControlRpcResponseMsg
responseChannels map[uint64]chan *lnutil.RemoteControlRpcResponseMsg
key *koblitz.PrivateKey
conMtx sync.Mutex

chunksOfMsg map[int64]*lnutil.ChunkMsg
}

// LndcRpcCanConnectLocally checks if we can connect to lit using the normal
Expand Down Expand Up @@ -116,9 +118,12 @@ func NewLndcRpcClient(address string, key *koblitz.PrivateKey) (*LndcRpcClient,
var err error

cli := new(LndcRpcClient)

cli.chunksOfMsg = make(map[int64]*lnutil.ChunkMsg)

// Create a map of chan objects to receive returned responses on. These channels
// are sent to from the ReceiveLoop, and awaited in the Call method.
cli.responseChannels = make(map[uint64]chan lnutil.RemoteControlRpcResponseMsg)
cli.responseChannels = make(map[uint64]chan *lnutil.RemoteControlRpcResponseMsg)

//Parse the address we're connecting to
who, where := lnutil.ParseAdrString(address)
Expand Down Expand Up @@ -158,7 +163,7 @@ func (cli *LndcRpcClient) Call(serviceMethod string, args interface{}, reply int

// Create the channel to receive the reply on
cli.responseChannelMtx.Lock()
cli.responseChannels[nonce] = make(chan lnutil.RemoteControlRpcResponseMsg)
cli.responseChannels[nonce] = make(chan *lnutil.RemoteControlRpcResponseMsg)
cli.responseChannelMtx.Unlock()

// Send the message in a goroutine
Expand Down Expand Up @@ -221,6 +226,35 @@ func (cli *LndcRpcClient) ReceiveLoop() {
return
}
msg = msg[:n]

if msg[0] == lnutil.MSGID_CHUNKS_BEGIN {

beginChunksMsg, _ := lnutil.NewChunksBeginMsgFromBytes(msg, 0)

msg_tmp := new(lnutil.ChunkMsg)
msg_tmp.TimeStamp = beginChunksMsg.TimeStamp
cli.chunksOfMsg[beginChunksMsg.TimeStamp] = msg_tmp

continue
}

if msg[0] == lnutil.MSGID_CHUNK_BODY {

chunkMsg, _ := lnutil.NewChunkMsgFromBytes(msg, 0)
cli.chunksOfMsg[chunkMsg.TimeStamp].Data = append(cli.chunksOfMsg[chunkMsg.TimeStamp].Data, chunkMsg.Data...)

continue
}

if msg[0] == lnutil.MSGID_CHUNKS_END {

endChunksMsg, _ := lnutil.NewChunksBeginMsgFromBytes(msg, 0)
msg = cli.chunksOfMsg[endChunksMsg.TimeStamp].Data

}



// We only care about RPC responses (for now)
if msg[0] == lnutil.MSGID_REMOTE_RPCRESPONSE {
// Parse the received message
Expand All @@ -239,7 +273,7 @@ func (cli *LndcRpcClient) ReceiveLoop() {
// reply and therefore, it could have not blocked and just
// ignore the return value.
select {
case responseChan <- response:
case responseChan <- &response:
default:
}

Expand Down
46 changes: 41 additions & 5 deletions lndc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"net"
"time"
"encoding/binary"

"github.com/mit-dci/lit/crypto/koblitz"
"github.com/mit-dci/lit/lnutil"
Expand Down Expand Up @@ -151,27 +152,62 @@ func (c *Conn) Write(b []byte) (n int, err error) {

// If we need to split the message into fragments, then we'll write
// chunks which maximize usage of the available payload.
chunkSize := math.MaxUint16
chunkSize := math.MaxUint16 - 1000

lenb := int(len(b))
curts := (time.Now().UnixNano())

beginChunksMsgBodyBytes := new(bytes.Buffer)
beginChunksMsgBodyBytes.WriteByte(lnutil.MSGID_CHUNKS_BEGIN)
binary.Write(beginChunksMsgBodyBytes, binary.BigEndian, curts)

// Start saving chunks
chunk_err := c.noise.WriteMessage(c.conn, beginChunksMsgBodyBytes.Bytes())
if chunk_err != nil {
return 0, chunk_err
}

bytesToWrite := len(b)
bytesWritten := 0
bytesToWrite := lenb

for bytesWritten < bytesToWrite {
// If we're on the last chunk, then truncate the chunk size as
// necessary to avoid an out-of-bounds array memory access.
if bytesWritten+chunkSize > len(b) {
chunkSize = len(b) - bytesWritten
chunkSize = lenb - bytesWritten
}

// Slice off the next chunk to be written based on our running
// counter and next chunk size.
chunk := b[bytesWritten : bytesWritten+chunkSize]
if err := c.noise.WriteMessage(c.conn, chunk); err != nil {
return bytesWritten, err

// Wrap chunk in a MSGID_CHUNK_BODY message
chunkMsgBodyBytes := new(bytes.Buffer)
chunkMsgBodyBytes.WriteByte(lnutil.MSGID_CHUNK_BODY)
binary.Write(chunkMsgBodyBytes, binary.BigEndian, curts)
binary.Write(chunkMsgBodyBytes, binary.BigEndian, int32(chunkSize))
chunkMsgBodyBytes.Write(chunk)


chunk_err = c.noise.WriteMessage(c.conn, chunkMsgBodyBytes.Bytes())
if chunk_err != nil {
return 0, chunk_err
}

bytesWritten += len(chunk)
}

// Actually send a message (unwrap and send)
endChunksMsgBodyBytes := new(bytes.Buffer)
endChunksMsgBodyBytes.WriteByte(lnutil.MSGID_CHUNKS_END)
binary.Write(endChunksMsgBodyBytes, binary.BigEndian, curts)

chunk_err = c.noise.WriteMessage(c.conn, endChunksMsgBodyBytes.Bytes())
if chunk_err != nil {
return 0, chunk_err
}


return bytesWritten, nil
}

Expand Down
41 changes: 41 additions & 0 deletions lnp2p/msgproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/mit-dci/lit/logging"
"sync"
"github.com/mit-dci/lit/lnutil"
)

// ParseFuncType is the type of a Message parser function.
Expand All @@ -28,6 +29,8 @@ type MessageProcessor struct {
// TODO Evaluate if this mutex is even necessary?
active bool
actmtx *sync.Mutex

ChunksOfMsg map[int64]*lnutil.ChunkMsg
}

// NewMessageProcessor processes messages coming in from over the network.
Expand All @@ -36,6 +39,7 @@ func NewMessageProcessor() MessageProcessor {
handlers: [256]*messagehandler{},
active: false,
actmtx: &sync.Mutex{},
ChunksOfMsg: make(map[int64]*lnutil.ChunkMsg),
}
}

Expand Down Expand Up @@ -77,6 +81,43 @@ func (mp *MessageProcessor) HandleMessage(peer *Peer, buf []byte) error {

// First see if we have handlers defined for this message type.
mtype := buf[0]

if mtype == 0xB2{

msg, _ := lnutil.NewChunksBeginMsgFromBytes(buf, peer.GetIdx())

chunk_msg := new(lnutil.ChunkMsg)
chunk_msg.TimeStamp = msg.TimeStamp

mp.ChunksOfMsg[msg.TimeStamp] = chunk_msg

return nil

}

if mtype == 0xB3{

msg, _ := lnutil.NewChunkMsgFromBytes(buf, peer.GetIdx())
mp.ChunksOfMsg[msg.TimeStamp].Data = append(mp.ChunksOfMsg[msg.TimeStamp].Data, msg.Data...)

return nil

}

if mtype == 0xB4{

msg, _ := lnutil.NewChunksEndMsgFromBytes(buf, peer.GetIdx())

buf = mp.ChunksOfMsg[msg.TimeStamp].Data
mtype = buf[0]

delete(mp.ChunksOfMsg, msg.TimeStamp)

}




h := mp.handlers[mtype]
if h == nil {
return fmt.Errorf("no handler found for messasge of type %x", mtype)
Expand Down
20 changes: 20 additions & 0 deletions lnp2p/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"time"
"math"

"github.com/mit-dci/lit/btcutil/hdkeychain"
"github.com/mit-dci/lit/crypto/koblitz"
Expand Down Expand Up @@ -87,6 +88,25 @@ func NewPeerManager(rootkey *hdkeychain.ExtendedKey, pdb lncore.LitPeerStorage,
mtx: &sync.Mutex{},
}


// Clear ChunksOfMsg in case of incomplete chunks transmittion.
// Try to clean the map every 5 minutes. Therefore message have to
// be transmitted within a 5 minutes.
go func(){

for {

time.Sleep(5 * time.Minute)

for k := range pm.mproc.ChunksOfMsg {
tdelta := time.Now().UnixNano() - k
if tdelta > 3*int64(math.Pow10(11)) {
delete(pm.mproc.ChunksOfMsg, k)
}
}
}
}()

return pm, nil
}

Expand Down
123 changes: 123 additions & 0 deletions lnutil/msglib.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ const (
MSGID_REMOTE_RPCREQUEST = 0xB0 // Contains an RPC request from a remote peer
MSGID_REMOTE_RPCRESPONSE = 0xB1 // Contains an RPC response to send to a remote peer

MSGID_CHUNKS_BEGIN uint8 = 0xB2
MSGID_CHUNK_BODY uint8 = 0xB3
MSGID_CHUNKS_END uint8 = 0xB4


DIGEST_TYPE_SHA256 = 0x00
DIGEST_TYPE_RIPEMD160 = 0x01
)
Expand Down Expand Up @@ -2412,3 +2417,121 @@ func (msg RemoteControlRpcResponseMsg) Peer() uint32 {
func (msg RemoteControlRpcResponseMsg) MsgType() uint8 {
return MSGID_REMOTE_RPCRESPONSE
}


// For chunked messages

type BeginChunksMsg struct {
PeerIdx uint32
TimeStamp int64
}

func NewChunksBeginMsgFromBytes(b []byte, peerIdx uint32) (BeginChunksMsg, error) {

msg := new(BeginChunksMsg)
buf := bytes.NewBuffer(b[1:]) // get rid of messageType

msg.PeerIdx = peerIdx
binary.Read(buf, binary.BigEndian, &msg.TimeStamp)

return *msg, nil
}

func (msg BeginChunksMsg) Bytes() []byte {
var buf bytes.Buffer

buf.WriteByte(msg.MsgType())
binary.Write(&buf, binary.BigEndian, msg.TimeStamp)

return buf.Bytes()
}

func (msg BeginChunksMsg) Peer() uint32 {
return msg.PeerIdx
}

func (msg BeginChunksMsg) MsgType() uint8 {
return MSGID_CHUNKS_BEGIN
}


type ChunkMsg struct {
PeerIdx uint32
TimeStamp int64
ChunkSize int32
Data []byte
}


func NewChunkMsgFromBytes(b []byte, peerIdx uint32) (ChunkMsg, error) {

msg := new(ChunkMsg)

buf := bytes.NewBuffer(b[1:]) // get rid of messageType

msg.PeerIdx = peerIdx
binary.Read(buf, binary.BigEndian, &msg.TimeStamp)
binary.Read(buf, binary.BigEndian, &msg.ChunkSize)

msg.Data = make([]byte, msg.ChunkSize)
binary.Read(buf, binary.BigEndian, msg.Data)

return *msg, nil

}


func (msg ChunkMsg) Bytes() []byte {
var buf bytes.Buffer

buf.WriteByte(msg.MsgType())
binary.Write(&buf, binary.BigEndian, msg.TimeStamp)
binary.Write(&buf, binary.BigEndian, msg.ChunkSize)
buf.Write(msg.Data)

return buf.Bytes()
}

func (msg ChunkMsg) Peer() uint32 {
return msg.PeerIdx
}

func (msg ChunkMsg) MsgType() uint8 {
return MSGID_CHUNK_BODY
}



type EndChunksMsg struct {
PeerIdx uint32
TimeStamp int64
}

func NewChunksEndMsgFromBytes(b []byte, peerIdx uint32) (EndChunksMsg, error) {


msg := new(EndChunksMsg)
buf := bytes.NewBuffer(b[1:]) // get rid of messageType

msg.PeerIdx = peerIdx
binary.Read(buf, binary.BigEndian, &msg.TimeStamp)

return *msg, nil
}

func (msg EndChunksMsg) Bytes() []byte {
var buf bytes.Buffer

buf.WriteByte(msg.MsgType())
binary.Write(&buf, binary.BigEndian, msg.TimeStamp)

return buf.Bytes()
}

func (msg EndChunksMsg) Peer() uint32 {
return msg.PeerIdx
}

func (msg EndChunksMsg) MsgType() uint8 {
return MSGID_CHUNKS_END
}