diff --git a/lua/autorun/netstream.lua b/lua/autorun/netstream.lua index 56e4d39..46ab2df 100644 --- a/lua/autorun/netstream.lua +++ b/lua/autorun/netstream.lua @@ -3,153 +3,301 @@ AddCSLuaFile() net.Stream = {} -net.Stream.ReadStreamQueues = {} --This holds a read stream for each player, or one read stream for the server if running on the CLIENT -net.Stream.WriteStreams = {} --This holds the write streams -net.Stream.SendSize = 20000 --This is the maximum size of each stream to send -net.Stream.Timeout = 30 --How long the data should exist in the store without being used before being destroyed -net.Stream.MaxServerReadStreams = 128 --The maximum number of keep-alives to have queued. This should prevent naughty players from flooding the network with keep-alive messages. -net.Stream.MaxServerChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB +net.Stream.SendSize = 20000 --This is the size of each packet to send +net.Stream.Timeout = 10 --How long to wait for client response before cleaning up +net.Stream.MaxWriteStreams = 1024 --The maximum number of write data items to store +net.Stream.MaxReadStreams = 128 --The maximum number of queued read data items to store +net.Stream.MaxChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB +net.Stream.MaxSize = net.Stream.SendSize*net.Stream.MaxChunks net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data -net.Stream.MaxKeepalive = 15 --Maximum times the client may request data stay live -net.Stream.ReadStream = {} ---Send the data sender a request for data -function net.Stream.ReadStream:Request() - if self.downloads == net.Stream.MaxTries * self.numchunks then self:Remove() return end - self.downloads = self.downloads + 1 - -- print("Requesting",self.identifier,false,false,#self.chunks) +local WriteStreamQueue = { + __index = { + Add = function(self, stream) + local identifier = self.curidentifier + local startid = identifier + while self.queue[identifier] do + identifier = identifier % net.Stream.MaxWriteStreams + 1 + if identifier == startid then + ErrorNoHalt("Netstream is full of WriteStreams!") + net.WriteUInt(0, 32) + return + end + end + self.curidentifier = identifier % net.Stream.MaxWriteStreams + 1 - net.Start("NetStreamRequest") - net.WriteUInt(self.identifier, 32) - net.WriteBit(false) - net.WriteBit(false) - net.WriteUInt(#self.chunks, 32) - if CLIENT then net.SendToServer() else net.Send(self.player) end + if next(self.queue)==nil then + self.activitytimeout = CurTime()+net.Stream.Timeout + timer.Create("netstream_queueclean", 5, 0, function() self:Clean() end) + end + self.queue[identifier] = stream + stream.identifier = identifier + return stream + end, + + Write = function(self, ply) + local identifier = net.ReadUInt(32) + local chunkidx = net.ReadUInt(32) + local stream = self.queue[identifier] + --print("Got request", identifier, chunkidx, stream) + if stream then + if stream:Write(ply, chunkidx) then + self.activitytimeout = CurTime()+net.Stream.Timeout + stream.timeout = CurTime()+net.Stream.Timeout + end + else + -- Tell them the stream doesn't exist + net.Start("NetStreamRead") + net.WriteUInt(identifier, 32) + net.WriteUInt(0, 32) + if SERVER then net.Send(ply) else net.SendToServer() end + end + end, + + Clean = function(self) + local t = CurTime() + for k, stream in pairs(self.queue) do + if (next(stream.clients)~=nil and t >= stream.timeout) or t >= self.activitytimeout then + stream:Remove() + self.queue[k] = nil + end + end + if next(self.queue)==nil then + timer.Remove("netstream_queueclean") + end + end, + }, + __call = function(t) + return setmetatable({ + activitytimeout = CurTime()+net.Stream.Timeout, + curidentifier = 1, + queue = {} + }, t) + end +} +setmetatable(WriteStreamQueue, WriteStreamQueue) +net.Stream.WriteStreams = WriteStreamQueue() + +local ReadStreamQueue = { + __index = { + Add = function(self, stream) + local queue = self.queues[stream.player] + + if #queue == net.Stream.MaxReadStreams then + ErrorNoHalt("Receiving too many ReadStream requests!") + return + end + + for _, v in ipairs(queue) do + if v.identifier == stream.identifier then + ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!") + return + end + end - timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout/2, 1, function() self:Request() end) + queue[#queue+1] = stream + if #queue == 1 then + stream:Request() + end + return stream + end, + + Remove = function(self, stream) + local queue = rawget(self.queues, stream.player) + if queue then + if stream == queue[1] then + table.remove(queue, 1) + local nextInQueue = queue[1] + if nextInQueue then + nextInQueue:Request() + else + self.queues[stream.player] = nil + end + else + for k, v in ipairs(queue) do + if v == stream then + table.remove(queue, k) + break + end + end + end + end + end, -end + Read = function(self, ply) + local identifier = net.ReadUInt(32) + local queue = rawget(self.queues, ply) + if queue and queue[1] then + queue[1]:Read(identifier) + end + end + }, + __call = function(t) + return setmetatable({ + queues = setmetatable({}, {__index = function(t,k) local r={} t[k]=r return r end}) + }, t) + end +} +setmetatable(ReadStreamQueue, ReadStreamQueue) +net.Stream.ReadStreams = ReadStreamQueue() ---Received data so process it -function net.Stream.ReadStream:Read(size) - timer.Remove("NetStreamReadTimeout" .. self.identifier) - local progress = net.ReadUInt(32) - if self.chunks[progress] then return end +local WritingDataItem = { + __index = { + Write = function(self, ply, chunkidx) + local client = self.clients[ply] + if client.finished then return false end + if chunkidx == #self.chunks+1 then self:Finished(ply) return true end - local crc = net.ReadString() - local data = net.ReadData(size) + if client.downloads+#self.chunks-client.progress >= net.Stream.MaxTries * #self.chunks then self:Finished(ply) return false end + client.downloads = client.downloads + 1 - if crc == util.CRC(data) then - self.chunks[progress] = data - end - if #self.chunks == self.numchunks then - self.returndata = table.concat(self.chunks) - if self.compressed then - self.returndata = util.Decompress(self.returndata) - end - self:Remove() - else - self:Request() - end + local chunk = self.chunks[chunkidx] + if not chunk then return false end -end + client.progress = chunkidx ---Gets the download progress -function net.Stream.ReadStream:GetProgress() - return #self.chunks/self.numchunks -end + --print("Sending", "NetStreamRead", self.identifier, #chunk.data, chunkidx, chunk.crc) + net.Start("NetStreamRead") + net.WriteUInt(self.identifier, 32) + net.WriteUInt(#chunk.data, 32) + net.WriteUInt(chunkidx, 32) + net.WriteString(chunk.crc) + net.WriteData(chunk.data, #chunk.data) + if CLIENT then net.SendToServer() else net.Send(ply) end + return true + end, + + Finished = function(self, ply) + self.clients[ply].finished = true + if self.callback then + local ok, err = xpcall(self.callback, debug.traceback, ply) + if not ok then ErrorNoHalt(err) end + end + end, + + GetProgress = function(self, ply) + return self.clients[ply].progress / #self.chunks + end, + + Remove = function(self) + local sendTo = {} + for ply, client in pairs(self.clients) do + if not client.finished then + client.finished = true + if CLIENT or ply:IsValid() then sendTo[#sendTo+1] = ply end + end + end ---Pop the queue and start the next task -function net.Stream.ReadStream:Remove() - - local ok, err = xpcall(self.callback, debug.traceback, self.returndata) - if not ok then ErrorNoHalt(err) end - - net.Start("NetStreamRequest") - net.WriteUInt(self.identifier, 32) - net.WriteBit(false) - net.WriteBit(true) - if CLIENT then net.SendToServer() else net.Send(self.player) end - - timer.Remove("NetStreamReadTimeout" .. self.identifier) - timer.Remove("NetStreamKeepAlive" .. self.identifier) - - if self == self.queue[1] then - table.remove(self.queue, 1) - local nextInQueue = self.queue[1] - if nextInQueue then - timer.Remove("NetStreamKeepAlive" .. nextInQueue.identifier) - nextInQueue:Request() - else - net.Stream.ReadStreamQueues[self.player] = nil - end - else - for k, v in ipairs(self.queue) do - if v == self then - table.remove(self.queue, k) - break + if next(sendTo)~=nil then + --print("Sending", "NetStreamRead", self.identifier, 0) + net.Start("NetStreamRead") + net.WriteUInt(self.identifier, 32) + net.WriteUInt(0, 32) + if SERVER then net.Send(sendTo) else net.SendToServer() end end end - end -end -net.Stream.ReadStream.__index = net.Stream.ReadStream - -net.Stream.WriteStream = {} - --- The player wants some data -function net.Stream.WriteStream:Write(ply) - local progress = net.ReadUInt(32)+1 - local chunk = self.chunks[progress] - if chunk then - self.clients[ply].progress = progress - net.Start("NetStreamDownload") - net.WriteUInt(#chunk.data, 32) - net.WriteUInt(progress, 32) - net.WriteString(chunk.crc) - net.WriteData(chunk.data, #chunk.data) - if CLIENT then net.SendToServer() else net.Send(ply) end - end -end + }, + __call = function(t, data, callback) + local chunks = {} + for i=1, math.ceil(#data / net.Stream.SendSize) do + local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize) + chunks[i] = { data = datachunk, crc = util.CRC(datachunk) } + end --- The player notified us they finished downloading or cancelled -function net.Stream.WriteStream:Finished(ply) - self.clients[ply].finished = true - if self.callback then - local ok, err = xpcall(self.callback, debug.traceback, ply) - if not ok then ErrorNoHalt(err) end + return setmetatable({ + timeout = CurTime()+net.Stream.Timeout, + chunks = chunks, + callback = callback, + lasttouched = 0, + clients = setmetatable({},{__index = function(t,k) + local r = { + finished = false, + downloads = 0, + progress = 0, + } t[k]=r return r + end}) + }, t) end -end +} +setmetatable(WritingDataItem, WritingDataItem) + +local ReadingDataItem = { + __index = { + Request = function(self) + if self.downloads+self.numchunks-#self.chunks >= net.Stream.MaxTries*self.numchunks then self:Remove() return end + self.downloads = self.downloads + 1 + timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout*0.5, 1, function() self:Request() end) + self:WriteRequest() + end, + + WriteRequest = function(self) + --print("Requesting", self.identifier, #self.chunks) + net.Start("NetStreamWrite") + net.WriteUInt(self.identifier, 32) + net.WriteUInt(#self.chunks+1, 32) + if CLIENT then net.SendToServer() else net.Send(self.player) end + end, + + Read = function(self, identifier) + if self.identifier ~= identifier then self:Request() return end + + local size = net.ReadUInt(32) + if size == 0 then self:Remove() return end + + local chunkidx = net.ReadUInt(32) + if chunkidx ~= #self.chunks+1 then self:Request() return end + + local crc = net.ReadString() + local data = net.ReadData(size) + + if crc ~= util.CRC(data) then self:Request() return end + + self.chunks[chunkidx] = data + if #self.chunks == self.numchunks then self:Remove(true) return end + + self:Request() + end, + + GetProgress = function(self) + return #self.chunks/self.numchunks + end, + + Remove = function(self, finished) + timer.Remove("NetStreamReadTimeout" .. self.identifier) + + local data + if finished then + data = table.concat(self.chunks) + if self.compressed then + data = util.Decompress(data, net.Stream.MaxSize) + end + self:WriteRequest() -- Notify we finished + end --- Get player's download progress -function net.Stream.WriteStream:GetProgress(ply) - return self.clients[ply].progress / #self.chunks -end + local ok, err = xpcall(self.callback, debug.traceback, data) + if not ok then ErrorNoHalt(err) end --- If the stream owner cancels it, notify everyone who is subscribed -function net.Stream.WriteStream:Remove() - local sendTo = {} - for ply, client in pairs(self.clients) do - if not client.finished then - client.finished = true - if ply:IsValid() then sendTo[#sendTo+1] = ply end + net.Stream.ReadStreams:Remove(self) end + }, + __call = function(t, ply, callback, numchunks, identifier, compressed) + return setmetatable({ + identifier = identifier, + chunks = {}, + compressed = compressed, + numchunks = numchunks, + callback = callback, + player = ply, + downloads = 0 + }, t) end +} +setmetatable(ReadingDataItem, ReadingDataItem) - net.Start("NetStreamDownload") - net.WriteUInt(0, 32) - net.WriteUInt(self.identifier, 32) - if SERVER then net.Send(sendTo) else net.SendToServer() end - net.Stream.WriteStreams[self.identifier] = nil -end - -net.Stream.WriteStream.__index = net.Stream.WriteStream ---Store the data and write the file info so receivers can request it. -local identifier = 1 function net.WriteStream(data, callback, dontcompress) - if not isstring(data) then error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2) end @@ -167,54 +315,18 @@ function net.WriteStream(data, callback, dontcompress) return end - local numchunks = math.ceil(#data / net.Stream.SendSize) - if CLIENT and numchunks > net.Stream.MaxServerChunks then + if #data > net.Stream.MaxSize then ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB") net.WriteUInt(0, 32) return end - local chunks = {} - for i=1, numchunks do - local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize) - chunks[i] = { - data = datachunk, - crc = util.CRC(datachunk), - } - end + local stream = net.Stream.WriteStreams:Add(WritingDataItem(data, callback, compressed)) + if not stream then return end - local startid = identifier - while net.Stream.WriteStreams[identifier] do - identifier = identifier % 1024 + 1 - if identifier == startid then - ErrorNoHalt("Netstream is full of WriteStreams!") - net.WriteUInt(0, 32) - return - end - end - - local stream = { - identifier = identifier, - chunks = chunks, - compressed = compressed, - numchunks = numchunks, - callback = callback, - clients = setmetatable({},{__index = function(t,k) - local r = { - finished = false, - downloads = 0, - keepalives = 0, - progress = 0, - } t[k]=r return r - end}) - } - setmetatable(stream, net.Stream.WriteStream) - - net.Stream.WriteStreams[identifier] = stream - timer.Create("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1, function() stream:Remove() end) - - net.WriteUInt(numchunks, 32) - net.WriteUInt(identifier, 32) + --print("WriteStream", #stream.chunks, stream.identifier, compressed) + net.WriteUInt(#stream.chunks, 32) + net.WriteUInt(stream.identifier, 32) net.WriteBool(compressed) return stream @@ -223,7 +335,6 @@ end --If the receiver is a player then add it to a queue. --If the receiver is the server then add it to a queue for each individual player function net.ReadStream(ply, callback) - if CLIENT then ply = NULL else @@ -236,17 +347,7 @@ function net.ReadStream(ply, callback) if not isfunction(callback) then error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2) end - - local queue = net.Stream.ReadStreamQueues[ply] - if queue then - if SERVER and #queue == net.Stream.MaxServerReadStreams then - ErrorNoHalt("Receiving too many ReadStream requests from ", ply) - return - end - else - queue = {} net.Stream.ReadStreamQueues[ply] = queue - end - + local numchunks = net.ReadUInt(32) if numchunks == nil then return @@ -255,110 +356,31 @@ function net.ReadStream(ply, callback) if not ok then ErrorNoHalt(err) end return end - if SERVER and numchunks > net.Stream.MaxServerChunks then - ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB") - return - end local identifier = net.ReadUInt(32) local compressed = net.ReadBool() - --print("Got info", numchunks, identifier, compressed) - for _, v in ipairs(queue) do - if v.identifier == identifier then - ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!") - return - end + if numchunks > net.Stream.MaxChunks then + ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB") + return end - local stream = { - identifier = identifier, - chunks = {}, - compressed = compressed, - numchunks = numchunks, - callback = callback, - queue = queue, - player = ply, - downloads = 0 - } - setmetatable(stream, net.Stream.ReadStream) - - queue[#queue + 1] = stream - if #queue > 1 then - timer.Create("NetStreamKeepAlive" .. identifier, net.Stream.Timeout / 2, 0, function() - net.Start("NetStreamRequest") - net.WriteUInt(identifier, 32) - net.WriteBit(true) - if CLIENT then net.SendToServer() else net.Send(ply) end - end) - else - stream:Request() - end + --print("ReadStream", numchunks, identifier, compressed) - return stream + return net.Stream.ReadStreams:Add(ReadingDataItem(ply, callback, numchunks, identifier, compressed)) end if SERVER then - - util.AddNetworkString("NetStreamRequest") - util.AddNetworkString("NetStreamDownload") - + util.AddNetworkString("NetStreamWrite") + util.AddNetworkString("NetStreamRead") end ---Stream data is requested -net.Receive("NetStreamRequest", function(len, ply) - - local identifier = net.ReadUInt(32) - local stream = net.Stream.WriteStreams[identifier] - - if stream then - ply = ply or NULL - local client = stream.clients[ply] - - if not client.finished then - local keepalive = net.ReadBit() == 1 - if keepalive then - if client.keepalives < net.Stream.MaxKeepalive then - client.keepalives = client.keepalives + 1 - timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1) - end - else - local completed = net.ReadBit() == 1 - if completed then - stream:Finished(ply) - else - if client.downloads < net.Stream.MaxTries * #stream.chunks then - client.downloads = client.downloads + 1 - stream:Write(ply) - timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1) - else - client.finished = true - end - end - end - end - end - +--Send requested stream data +net.Receive("NetStreamWrite", function(len, ply) + net.Stream.WriteStreams:Write(ply or NULL) end) ---Download the stream data -net.Receive("NetStreamDownload", function(len, ply) - - ply = ply or NULL - local queue = net.Stream.ReadStreamQueues[ply] - if queue then - local size = net.ReadUInt(32) - if size > 0 then - queue[1]:Read(size) - else - local id = net.ReadUInt(32) - for k, v in ipairs(queue) do - if v.identifier == id then - v:Remove() - break - end - end - end - end - +--Download the sent stream data +net.Receive("NetStreamRead", function(len, ply) + net.Stream.ReadStreams:Read(ply or NULL) end)