Skip to content

Commit

Permalink
Switch Scheduler to use yieldTo() instead of yield(). Scheduler prope…
Browse files Browse the repository at this point in the history
…r is now its own Fiber. No need to be "in" scheduler context in order to call async functions. Fix race condition in WorkerPool that would cause it to get stuck in idle().
  • Loading branch information
ccutrer committed May 3, 2009
1 parent 400474e commit c37d89e
Show file tree
Hide file tree
Showing 20 changed files with 305 additions and 336 deletions.
50 changes: 25 additions & 25 deletions mordor/common/asyncsocket.d
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ version (Windows) {
import tango.stdc.errno;
version (epoll) import tango.sys.linux.epoll;
}

extern (C) int printf(char* format, ...);
class AsyncSocket : Socket
{
public:
Expand Down Expand Up @@ -159,7 +159,7 @@ public:
exception("Unable to connect socket: ");
}
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
exception("Unable to connect socket: ");
Expand All @@ -170,7 +170,7 @@ public:
super.connect(to);
if (errno == EINPROGRESS) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
int err;
getOption(SocketOptionLevel.SOCKET, SocketOption.SO_ERROR, (cast(void*)&err)[0..int.sizeof]);
if (err != 0) {
Expand Down Expand Up @@ -203,7 +203,7 @@ public:
exception(
"Unable to accept socket connection");
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret && m_readEvent.lastError != ERROR_MORE_DATA) {
SetLastError(m_readEvent.lastError);
throw new SocketAcceptException(
Expand All @@ -216,7 +216,7 @@ public:
socket_t newsock = .accept(sock, null, null);
while (newsock == socket_t.init && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
newsock = .accept(sock, null, null);
}
if (newsock == socket_t.init) {
Expand Down Expand Up @@ -245,7 +245,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -255,7 +255,7 @@ public:
int rc = super.send(buf, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.send(buf, flags);
}
return rc;
Expand All @@ -272,7 +272,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
return ERROR;
Expand All @@ -282,7 +282,7 @@ public:
int rc = super.send(bufs, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.send(bufs, flags);
}
return rc;
Expand All @@ -302,7 +302,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -312,7 +312,7 @@ public:
int rc = super.sendTo(buf, flags, to);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.send(buf, flags);
}
return rc;
Expand All @@ -330,7 +330,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -340,7 +340,7 @@ public:
int rc = super.sendTo(bufs, flags, to);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.sendTo(bufs, flags, to);
}
return rc;
Expand All @@ -358,7 +358,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_writeEvent.ret) {
SetLastError(m_writeEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -368,7 +368,7 @@ public:
int rc = super.sendTo(bufs, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_writeEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.sendTo(bufs, flags);
}
return rc;
Expand All @@ -390,7 +390,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret) {
SetLastError(m_readEvent.lastError);
return ERROR;
Expand All @@ -400,7 +400,7 @@ public:
int rc = super.receive(buf, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.receive(buf, flags);
}
return rc;
Expand All @@ -420,7 +420,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret) {
SetLastError(m_readEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -430,7 +430,7 @@ public:
int rc = super.receive(bufs, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.receive(bufs, flags);
}
return rc;
Expand All @@ -454,7 +454,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret) {
SetLastError(m_readEvent.lastError);
return ERROR;
Expand All @@ -464,7 +464,7 @@ public:
int rc = super.receiveFrom(buf, flags, from);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.receiveFrom(buf, flags, from);
}
return rc;
Expand All @@ -483,7 +483,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret) {
SetLastError(m_readEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -493,7 +493,7 @@ public:
int rc = super.receiveFrom(bufs, flags, from);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.receiveFrom(bufs, flags, from);
}
return rc;
Expand All @@ -510,7 +510,7 @@ public:
if (ret && GetLastError() != WSA_IO_PENDING) {
return ret;
}
Fiber.yield();
Scheduler.getThis().yieldTo();
if (!m_readEvent.ret) {
SetLastError(m_readEvent.lastError);
return tango.net.Socket.SOCKET_ERROR;
Expand All @@ -520,7 +520,7 @@ public:
int rc = super.receiveFrom(bufs, flags);
while (rc == ERROR && errno == EAGAIN) {
_ioManager.registerEvent(&m_readEvent);
Fiber.yield();
Scheduler.getThis().yieldTo();
rc = super.receiveFrom(bufs, flags);
}
return rc;
Expand Down
24 changes: 10 additions & 14 deletions mordor/common/examples/benchmark.d
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ void main(char[][] args)
bool runClient = true;

if (runServer) {
g_ioManager.schedule(new Fiber(delegate void() {
Socket s = new AsyncSocket(g_ioManager, AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
s.bind(new InternetAddress("0.0.0.0", SERVER_PORT));
s.listen(10);

while(true) {
Socket newsocket = s.accept();
ServerConnection newconn = new ServerConnection(newsocket);
Scheduler.getThis.schedule(new Fiber(&newconn.run));
}
}));
Socket s = new AsyncSocket(g_ioManager, AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
s.bind(new InternetAddress("0.0.0.0", SERVER_PORT));
s.listen(10);

while(true) {
Socket newsocket = s.accept();
ServerConnection newconn = new ServerConnection(newsocket);
Scheduler.getThis.schedule(new Fiber(&newconn.run));
}
}

if (runClient) {
Expand All @@ -40,8 +38,6 @@ void main(char[][] args)
g_clients = new Fiber[g_totalConns];
ClientConnection.startTest(1);
}

g_ioManager.start(true);
}

class ServerConnection
Expand All @@ -54,7 +50,7 @@ public:

void run()
{
scope (exit) sock.shutdown(SocketShutdown.BOTH);
scope (exit) { sock.shutdown(SocketShutdown.BOTH); Scheduler.getThis.yieldTo(); }
while(true) {
ubyte[1] buffer;
int rc = sock.receive(buffer);
Expand Down
30 changes: 15 additions & 15 deletions mordor/common/examples/cat.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ void main(string[] args)

Stream stdout = new StdoutStream;

WorkerPool pool = new WorkerPool("pool", 1);
WorkerPool mainpool = new WorkerPool("main");
WorkerPool pool = new WorkerPool("pool", 2, false);
pool.switchTo();

pool.schedule(new Fiber(delegate void() {
if (args.length == 1)
args ~= "-";
foreach(string arg; args[1..$]) {
Stream inStream;
if (arg == "-")
inStream = new StdinStream;
else
inStream = new FileStream(arg, FileStream.Flags.READ);
if (args.length == 1)
args ~= "-";
foreach(string arg; args[1..$]) {
Stream inStream;
if (arg == "-")
inStream = new StdinStream;
else
inStream = new FileStream(arg, FileStream.Flags.READ);

transferStream(inStream, stdout);
}
pool.stop();
}));
pool.start(true);
transferStream(inStream, stdout);
}
mainpool.switchTo();
pool.stop();
}
23 changes: 10 additions & 13 deletions mordor/common/examples/echoserver.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@ void main(char[][])
{
IOManager ioManager = new IOManager(5);

ioManager.schedule(new Fiber(delegate void() {
Socket s = new AsyncSocket(ioManager, AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
s.bind(new InternetAddress("127.0.0.1", 8000));
s.listen(10);

while(true) {
Socket newsocket = s.accept();
Connection newconn = new Connection(newsocket);
Scheduler.getThis.schedule(new Fiber(&newconn.run));
}
}));

ioManager.start(true);
Socket s = new AsyncSocket(ioManager, AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
s.bind(new InternetAddress("127.0.0.1", 8000));
s.listen(10);

while(true) {
Socket newsocket = s.accept();
Connection newconn = new Connection(newsocket);
ioManager.schedule(new Fiber(&newconn.run));
}
}

class Connection
Expand All @@ -35,6 +31,7 @@ public:

void run()
{
scope (exit) Scheduler.getThis().yieldTo();
while(true) {
Stdout.formatln("starting a new read");
void[] buffer = new void[4096];
Expand Down
Loading

0 comments on commit c37d89e

Please sign in to comment.