Skip to content

Commit

Permalink
working datapath draft changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nigriMSFT committed Dec 22, 2023
1 parent 39628b3 commit fa78ca9
Show file tree
Hide file tree
Showing 22 changed files with 2,623 additions and 1,231 deletions.
5 changes: 1 addition & 4 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ QuicWorkerInitialize(
Worker->ExecutionContext.NextTimeUs = UINT64_MAX;
Worker->ExecutionContext.Ready = TRUE;

#ifndef _KERNEL_MODE // Not supported on kernel mode
if (ExecProfile != QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT) {
Worker->IsExternal = TRUE;
CxPlatAddExecutionContext(&Worker->ExecutionContext, PartitionIndex);
} else
#endif // _KERNEL_MODE
{
} else {
const uint16_t ThreadFlags =
ExecProfile == QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME ?
CXPLAT_THREAD_FLAG_SET_AFFINITIZE : CXPLAT_THREAD_FLAG_NONE;
Expand Down
5 changes: 0 additions & 5 deletions src/inc/quic_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,6 @@ typedef struct CXPLAT_EXECUTION_CONTEXT {

} CXPLAT_EXECUTION_CONTEXT;

#ifdef _KERNEL_MODE // Not supported on kernel mode
#define CxPlatAddExecutionContext(Context, IdealProcessor) CXPLAT_FRE_ASSERT(FALSE)
#define CxPlatWakeExecutionContext(Context) CXPLAT_FRE_ASSERT(FALSE)
#else
void
CxPlatAddExecutionContext(
_Inout_ CXPLAT_EXECUTION_CONTEXT* Context,
Expand All @@ -477,7 +473,6 @@ void
CxPlatWakeExecutionContext(
_In_ CXPLAT_EXECUTION_CONTEXT* Context
);
#endif

//
// The "type" of the completion queue event is stored as the first uint32_t of
Expand Down
125 changes: 110 additions & 15 deletions src/inc/quic_platform_winkernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,12 @@ _CxPlatEventWaitWithTimeout(
)
{
LARGE_INTEGER Timeout100Ns;
Timeout100Ns.QuadPart = Int32x32To64(TimeoutMs, -10000);
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, &Timeout100Ns);
if (TimeoutMs == 0xffffffff) {
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, NULL);
} else {
Timeout100Ns.QuadPart = Int32x32To64(TimeoutMs, -10000);
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, &Timeout100Ns);
}
}
#define CxPlatEventWaitWithTimeout(Event, TimeoutMs) \
(STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(&Event, TimeoutMs))
Expand All @@ -471,16 +475,35 @@ _CxPlatEventWaitWithTimeout(
// Event Queue Interfaces
//

typedef KEVENT CXPLAT_EVENTQ; // Event queue
typedef void* CXPLAT_CQE;
typedef struct CXPLAT_EVENTQ {
CXPLAT_DISPATCH_LOCK Lock;
LIST_ENTRY Events;
CXPLAT_EVENT EventsAvailable;
} CXPLAT_EVENTQ;

typedef struct CXPLAT_CQE {
void* UserData;
} CXPLAT_CQE;

#define CXPLAT_SQE CXPLAT_SQE
#define CXPLAT_SQE_DEFAULT {0}
typedef struct CXPLAT_SQE {
void* UserData;
int Overlapped; // Used as the completion context to platform IO routines.
LIST_ENTRY Link;
BOOLEAN IsQueued; // Prevent double queueing.
} CXPLAT_SQE;

inline
BOOLEAN
CxPlatEventQInitialize(
_Out_ CXPLAT_EVENTQ* queue
)
{
KeInitializeEvent(queue, SynchronizationEvent, FALSE);
CxPlatZeroMemory(queue, sizeof(*queue));
CxPlatDispatchLockInitialize(&queue->Lock);
InitializeListHead(&queue->Events);
CxPlatEventInitialize(&queue->EventsAvailable, TRUE, FALSE);
return TRUE;
}

Expand All @@ -489,24 +512,55 @@ void
CxPlatEventQCleanup(
_In_ CXPLAT_EVENTQ* queue
)
{
CxPlatEventUninitialize(queue->EventsAvailable);
CXPLAT_DBG_ASSERT(IsListEmpty(&queue->Events));
CxPlatDispatchLockUninitialize(&queue->Lock);
}

inline
BOOLEAN
CxPlatEventQAssociateHandle(
_In_ CXPLAT_EVENTQ* queue,
_In_ HANDLE fileHandle
)
{
UNREFERENCED_PARAMETER(queue);
UNREFERENCED_PARAMETER(fileHandle);
return FALSE;
}

inline
BOOLEAN
_CxPlatEventQEnqueue(
CxPlatEventQEnqueue(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE* sqe,
_In_opt_ void* user_data
)
{
UNREFERENCED_PARAMETER(user_data);
KeSetEvent(queue, IO_NO_INCREMENT, FALSE);
BOOLEAN SignalEvent;

CxPlatDispatchLockAcquire(&queue->Lock);

if (sqe->IsQueued) {
CxPlatDispatchLockRelease(&queue->Lock);
return TRUE;
}

sqe->IsQueued = TRUE;
sqe->UserData = user_data;
SignalEvent = IsListEmpty(&queue->Events);
InsertTailList(&queue->Events, &sqe->Link);

CxPlatDispatchLockRelease(&queue->Lock);

if (SignalEvent) {
CxPlatEventSet(queue->EventsAvailable);
}

return TRUE;
}

#define CxPlatEventQEnqueue(queue, sqe, user_data) _CxPlatEventQEnqueue(queue, user_data)

inline
uint32_t
CxPlatEventQDequeue(
Expand All @@ -516,9 +570,40 @@ CxPlatEventQDequeue(
_In_ uint32_t wait_time // milliseconds
)
{
UNREFERENCED_PARAMETER(count);
*events = NULL;
return STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(queue, wait_time) ? 1 : 0;
LIST_ENTRY* Entry;
uint32_t EventsDequeued = 0;

// TODO: this fn signature needs better SAL
RtlZeroMemory(events, sizeof(*events));

CxPlatEventReset(queue->EventsAvailable);

CxPlatDispatchLockAcquire(&queue->Lock);

if (IsListEmpty(&queue->Events)) {
CxPlatDispatchLockRelease(&queue->Lock);
CxPlatEventWaitWithTimeout(queue->EventsAvailable, wait_time);
CxPlatDispatchLockAcquire(&queue->Lock);
}

while (EventsDequeued < count && !IsListEmpty(&queue->Events)) {
Entry = RemoveHeadList(&queue->Events);
CXPLAT_SQE* Sqe = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_SQE, Link);
// TODO: this fn signature needs better SAL
#pragma warning(push)
#pragma warning(disable:6386)
events[EventsDequeued].UserData = Sqe->UserData;
#pragma warning(pop)

CXPLAT_DBG_ASSERT(Sqe->IsQueued);
Sqe->IsQueued = FALSE;

EventsDequeued++;
}

CxPlatDispatchLockRelease(&queue->Lock);

return EventsDequeued;
}

inline
Expand All @@ -538,9 +623,17 @@ CxPlatCqeUserData(
_In_ const CXPLAT_CQE* cqe
)
{
return *cqe;
return cqe->UserData;
}

typedef struct DATAPATH_SQE DATAPATH_SQE;

void
CxPlatDatapathSqeInitialize(
_Out_ DATAPATH_SQE* DatapathSqe,
_In_ uint32_t CqeType
);

//
// Time Measurement Interfaces
//
Expand Down Expand Up @@ -711,7 +804,9 @@ CxPlatSleep(
KeWaitForSingleObject(&SleepTimer, Executive, KernelMode, FALSE, NULL);
}

#define CxPlatSchedulerYield() // no-op
#define CxPlatSchedulerYield() \
LARGE_INTEGER Timeout = {0}; \
KeDelayExecutionThread(KernelMode, FALSE, &Timeout)

//
// Create Thread Interfaces
Expand Down
2 changes: 1 addition & 1 deletion src/platform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ if("${CX_PLATFORM}" STREQUAL "windows")
${SYSTEM_PROCESSOR} STREQUAL "arm64ec")
set(SOURCES ${SOURCES} datapath_raw_dummy.c)
else()
set(SOURCES ${SOURCES} datapath_raw_win.c datapath_raw_socket.c datapath_raw_socket_win.c datapath_raw_xdp_win.c)
set(SOURCES ${SOURCES} datapath_raw_win.c datapath_raw_socket.c datapath_raw_socket_win.c datapath_raw_xdp_win.c datapath_raw_xdp_winuser.c)
endif()
else()
set(SOURCES ${SOURCES} inline.c platform_posix.c storage_posix.c cgroup.c)
Expand Down
33 changes: 20 additions & 13 deletions src/platform/datapath_raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ typedef struct CXPLAT_SOCKET_RAW {
CXPLAT_HASHTABLE_ENTRY Entry;
CXPLAT_RUNDOWN_REF Rundown;
CXPLAT_DATAPATH_RAW* RawDatapath;
#ifndef _KERNEL_MODE
SOCKET AuxSocket;
#endif
BOOLEAN Wildcard; // Using a wildcard local address. Optimization
// to avoid always reading LocalAddress.
uint8_t CibirIdLength; // CIBIR ID length. Value of 0 indicates CIBIR isn't used
Expand Down Expand Up @@ -374,14 +376,14 @@ CxPlatFramingWriteHeaders(
#pragma pack(push)
#pragma pack(1)

typedef struct ETHERNET_HEADER {
typedef struct CXPLAT_ETHERNET_HEADER {
uint8_t Destination[6];
uint8_t Source[6];
uint16_t Type;
uint8_t Data[0];
} ETHERNET_HEADER;
} CXPLAT_ETHERNET_HEADER;

typedef struct IPV4_HEADER {
typedef struct CXPLAT_IPV4_HEADER {
uint8_t VersionAndHeaderLength;
union {
uint8_t TypeOfServiceAndEcnField;
Expand All @@ -396,20 +398,20 @@ typedef struct IPV4_HEADER {
uint8_t TimeToLive;
uint8_t Protocol;
uint16_t HeaderChecksum;
uint8_t Source[4];
uint8_t Destination[4];
uint8_t SourceAddress[4];
uint8_t DestinationAddress[4];
uint8_t Data[0];
} IPV4_HEADER;
} CXPLAT_IPV4_HEADER;

typedef struct IPV6_HEADER {
typedef struct CXPLAT_IPV6_HEADER {
uint32_t VersionClassEcnFlow;
uint16_t PayloadLength;
uint8_t NextHeader;
uint8_t HopLimit;
uint8_t Source[16];
uint8_t Destination[16];
uint8_t SourceAddress[16];
uint8_t DestinationAddress[16];
uint8_t Data[0];
} IPV6_HEADER;
} CXPLAT_IPV6_HEADER;

typedef struct IPV6_EXTENSION {
uint8_t NextHeader;
Expand Down Expand Up @@ -456,11 +458,16 @@ typedef struct TCP_HEADER {
#define TH_CWR 0x80

#define IPV4_VERSION 4
#ifndef _KERNEL_MODE
#define IPV6_VERSION 6
#endif
#define IPV4_VERSION_BYTE (IPV4_VERSION << 4)
#define IPV4_DEFAULT_VERHLEN ((IPV4_VERSION_BYTE) | (sizeof(IPV4_HEADER) / sizeof(uint32_t)))
#ifndef _KERNEL_MODE
#define IPV4_DEFAULT_VERHLEN ((IPV4_VERSION_BYTE) | (sizeof(CXPLAT_IPV4_HEADER) / sizeof(uint32_t)))
#endif

#define IP_DEFAULT_HOP_LIMIT 128

#define ETHERNET_TYPE_IPV4 0x0008
#define ETHERNET_TYPE_IPV6 0xdd86

#define CXPLAT_ETHERNET_TYPE_IPV4 0x0008
#define CXPLAT_ETHERNET_TYPE_IPV6 0xdd86
Loading

0 comments on commit fa78ca9

Please sign in to comment.