Skip to content

Commit

Permalink
Improved markdown stream parser performance
Browse files Browse the repository at this point in the history
  • Loading branch information
salmenus committed Jun 5, 2024
1 parent b8ae07a commit 1421d8a
Show file tree
Hide file tree
Showing 27 changed files with 230 additions and 86 deletions.
3 changes: 2 additions & 1 deletion packages/extra/highlighter/src/ext/highlightJsExtension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ const createExtension = (): HighlighterExtension => {
}

const highlighterFunction: Highlighter = (code: string, language: string) => {
const languageToUse = hljs.getLanguage(language) ? language : 'plaintext';
const result = hljs.highlight(code, {
language: language,
language: languageToUse,
ignoreIllegals: true,
});

Expand Down
2 changes: 1 addition & 1 deletion packages/js/core/src/aiChat/renderer/renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export class NluxRenderer<AiMsg> {
markdownLinkTarget: this.theMessageOptions?.markdownLinkTarget,
showCodeBlockCopyButton: this.theMessageOptions?.showCodeBlockCopyButton,
streamingAnimationSpeed: this.theMessageOptions?.streamingAnimationSpeed,
skipStreamingAnimation: false,
skipStreamingAnimation: this.theMessageOptions?.skipStreamingAnimation,
composer: {
placeholder: this.theComposerOptions?.placeholder,
autoFocus: this.theComposerOptions?.autoFocus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ export const useSubmitPromptHandler = <AiMsg>(props: SubmitPromptHandlerProps<Ai

if (streamedMessageIds.size > 0) {
streamedMessageIds.forEach((messageId) => {
conversationRef.current?.completeStream(chatSegmentObservable.segmentId, messageId);
requestAnimationFrame(() => {
conversationRef.current?.completeStream(chatSegmentObservable.segmentId, messageId);
});
});

streamedMessageIds.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {FC, ReactElement, Ref, RefObject, useEffect, useImperativeHandle, useMemo, useRef, useState} from 'react';
import {createMarkdownStreamParser, MarkdownStreamParser} from '../../../../../extra/markdown/src';
import {className as compMessageClassName} from '@shared/components/Message/create';
import {
directionClassName as compMessageDirectionClassName,
Expand All @@ -9,6 +8,8 @@ import {warn} from '@shared/utils/warn';
import {StreamResponseComponentProps} from '../../exports/messageOptions';
import {StreamContainerImperativeProps, StreamContainerProps} from './props';
import {streamingDomService} from './streamingDomService';
import {createMdStreamRenderer} from '@shared/markdown/stream/streamParser';
import {StandardStreamParserOutput} from '@shared/types/markdown/streamParser';

export const StreamContainerComp = function <AiMsg>(
props: StreamContainerProps<AiMsg>,
Expand All @@ -26,7 +27,7 @@ export const StreamContainerComp = function <AiMsg>(
// rendering cycle, we don't want to trigger re-renders on every chunk of data received.
const rootElRef = useRef<HTMLDivElement | null>(null);
const rootElRefPreviousValue = useRef<HTMLDivElement | null>(null);
const mdStreamParserRef = useRef<MarkdownStreamParser | null>(null);
const mdStreamParserRef = useRef<StandardStreamParserOutput | null>(null);

const [streamContainer, setStreamContainer] = useState<HTMLDivElement>();
const [initialMarkdownMessageParsed, setInitialMarkdownMessageParsed] = useState(false);
Expand All @@ -50,7 +51,7 @@ export const StreamContainerComp = function <AiMsg>(
// We update the stream parser when key options (markdownLinkTarget, syntaxHighlighter, etc.) change.
useEffect(() => {
const element = streamingDomService.getStreamingDomElement(uid);
mdStreamParserRef.current = createMarkdownStreamParser(element, {
mdStreamParserRef.current = createMdStreamRenderer(element, {
syntaxHighlighter: markdownOptions?.syntaxHighlighter,
htmlSanitizer: markdownOptions?.htmlSanitizer,
markdownLinkTarget: markdownOptions?.markdownLinkTarget,
Expand Down Expand Up @@ -99,7 +100,6 @@ export const StreamContainerComp = function <AiMsg>(
useEffect(() => {
return () => {
rootElRefPreviousValue.current = null;
mdStreamParserRef.current?.complete();
mdStreamParserRef.current = null;
setStreamContainer(undefined);
};
Expand All @@ -113,7 +113,9 @@ export const StreamContainerComp = function <AiMsg>(
warn('When using a markdown stream renderer, the chunk must be a string.');
}
},
completeStream: () => mdStreamParserRef.current?.complete(),
completeStream: () => {
mdStreamParserRef.current?.complete();
},
}), []);

const compDirectionClassName = compMessageDirectionClassName['received'];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ export const createMessageContent = (
if (format === 'markdown') {
// Render message as a text node to avoid XSS
const htmlElement = document.createElement('div');
const parsedMarkdown = parseMdSnapshot(message, markdownOptions);
htmlElement.innerHTML = markdownOptions?.htmlSanitizer
? markdownOptions.htmlSanitizer(parsedMarkdown)
: parsedMarkdown;

htmlElement.innerHTML = parseMdSnapshot(message, markdownOptions);
attachCopyClickListener(htmlElement);

const fragment = document.createDocumentFragment();
Expand Down
108 changes: 82 additions & 26 deletions packages/shared/src/markdown/stream/streamParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,101 @@ import {StandardStreamParser} from '../../types/markdown/streamParser';
import {warn} from '../../utils/warn';
import {parseMdSnapshot} from '../snapshot/snapshotParser';

const delayBeforeComplete = 2000;
const defaultDelayInMsBeforeComplete = 2000;
const defaultDelayInMsBetweenBufferChecks = 2;

const getScheduler = (type: 'timeout' | 'animationFrame') => {
if (type === 'timeout') {
return (callback: () => void) => setTimeout(callback, 0);
}

return (callback: () => void) => requestAnimationFrame(callback);
};

export const createMdStreamRenderer: StandardStreamParser = (
root: HTMLElement,
options,
) => {
let isComplete = false;
const {onComplete, htmlSanitizer} = options || {};
let streamIsComplete = false;

const letterByLetter = true;
const {onComplete} = options || {};

//
// Buffer to store the chunks of markdown to be parsed
// Scheduler to control the speed of the streaming animation
//
const buffer: string[] = [];
const scheduler = getScheduler(
options?.skipStreamingAnimation ? 'timeout' : 'animationFrame',
);

//
// Container for markdown being parsed and that can be updated
//
const wipContainer = document.createElement('div');
wipContainer.classList.add('md-in-progress');
root.appendChild(wipContainer);
root.append(wipContainer);

//
// Functions to commit the WIP content to the DOM
// And to complete the parsing
//
const commitWipContent = () => {
while (wipContainer.firstChild) {
root.appendChild(wipContainer.firstChild);
wipContainer.before(wipContainer.firstChild);
}
};

const completeParsing = () => {
isComplete = true;
streamIsComplete = true;
if (parsingInterval) {
clearInterval(parsingInterval);
parsingInterval = undefined;
}

commitWipContent();
wipContainer.remove();
onComplete?.();
};

const markAsComplete = () => setTimeout(completeParsing, delayBeforeComplete);

let timeSinceLastProcessing: number | undefined = undefined;
let currentMarkdown = '';
let previousHtml: string | undefined = undefined;
let completeStreamTimer = markAsComplete();

return {
next: (chunk: string) => {
if (isComplete) {
warn('Stream is closed. Chunk will be ignored');
return;
const parsingIntervalDelay = (
!options?.skipStreamingAnimation && options?.streamingAnimationSpeed && options.streamingAnimationSpeed >= 0
) ? options.streamingAnimationSpeed : defaultDelayInMsBetweenBufferChecks;

let parsingInterval: number | undefined = setInterval(() => {
const nowTime = new Date().getTime();
if (buffer.length === 0) {
if (
streamIsComplete
|| (timeSinceLastProcessing && nowTime - timeSinceLastProcessing > defaultDelayInMsBeforeComplete)
) {
completeParsing();
}

// Set the stream to complete after 2 seconds of no new chunks
clearTimeout(completeStreamTimer);
completeStreamTimer = markAsComplete();
return;
}

timeSinceLastProcessing = nowTime;
const chunk = buffer.shift();
if (!chunk) {
return;
}

scheduler(() => {
// We should only parse the last chunk (in it release context) instead of the whole text
// In order to do that, we need to distinguish between:
// - We will have WIP text to being parsed, and may be incomplete (example: `# Hello, `)
// - Text that is committed to the DOM and will not change (example: `# Hello World!\n\n`)

// Append the new chunk to the raw text and parse
const markdownToParse = currentMarkdown + chunk;
const parsedHtml = parseMdSnapshot(markdownToParse, options);
const parsedHtml = parseMdSnapshot(markdownToParse, options).trim();

if (typeof parsedHtml !== 'string') {
// Remove the last chunk if parsing failed
currentMarkdown = currentMarkdown.slice(0, -chunk.length);
Expand All @@ -69,14 +110,13 @@ export const createMdStreamRenderer: StandardStreamParser = (
// Which means that the last parsed markdown is complete and should be committed to the DOM
// Commit the last parsed content to the DOM

for (let i = 0; i < wipContainer.children.length; i++) {
wipContainer.insertAdjacentElement('beforebegin', wipContainer.children[i]);
while (wipContainer.children.length > 0) {
wipContainer.before(wipContainer.children[0]);
}

// Extract new HTML and insert it into WIP container
const currentHtml = parsedHtml.slice(previousHtml.length);
const trimmedHtml = currentHtml.trim();
wipContainer.innerHTML = htmlSanitizer ? htmlSanitizer(trimmedHtml) : trimmedHtml;
const currentHtml = parsedHtml.slice(previousHtml.length).trim();
wipContainer.innerHTML = options?.htmlSanitizer ? options.htmlSanitizer(currentHtml) : currentHtml;

// Focus on everything that is new
currentMarkdown = chunk;
Expand All @@ -86,16 +126,32 @@ export const createMdStreamRenderer: StandardStreamParser = (
// This means that new chunk goes inside previous HTML and no root level changes

// Append the new chunk to the current markdown
const trimmedHtml = parsedHtml.trim();
wipContainer.innerHTML = htmlSanitizer ? htmlSanitizer(trimmedHtml) : trimmedHtml;
wipContainer.innerHTML = options?.htmlSanitizer ? options.htmlSanitizer(parsedHtml) : parsedHtml;

// Update the current markdown and previous HTML for the next iteration
currentMarkdown = markdownToParse;
previousHtml = parsedHtml;
}
});
}, parsingIntervalDelay) as unknown as number;

return {
next: (chunk: string) => {
if (streamIsComplete) {
warn('Stream is already complete. No more chunks can be added');
return;
}

if (letterByLetter) {
for (const char of chunk) {
buffer.push(char);
}
} else {
buffer.push(chunk);
}
},
complete: () => {
completeParsing();
streamIsComplete = true;
},
error: () => {
// No error handling for now
Expand Down
2 changes: 1 addition & 1 deletion pipeline/npm/versions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"inherit": true,
"nlux": "2.3.7",
"nlux": "2.4.0",
"peerDependencies": {
"react": "18",
"react-dom": "18"
Expand Down
10 changes: 6 additions & 4 deletions samples/aiChat/react/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ function App() {
}, []);

const conversationStarters: ConversationStarter[] = [
{prompt: 'Hello, World!'},
{prompt: 'How are you?'},
{prompt: 'Write Hello World in Python, C++, and Java.'},
{prompt: 'Write hello world in Python.'},
{prompt: 'Write a poem using markdown and emojis'},
{prompt: 'What is your name?'},
{prompt: 'What is your favorite color?'},
];
Expand Down Expand Up @@ -275,8 +276,9 @@ function App() {
markdownLinkTarget: 'blank',
syntaxHighlighter: highlighter,
htmlSanitizer: htmlSanitizer,
// showCodeBlockCopyButton: false,
// streamingAnimationSpeed: 100,
showCodeBlockCopyButton: true,
streamingAnimationSpeed: 100,
skipStreamingAnimation: true,
responseRenderer: useCustomResponseComponent ? responseRenderer : undefined,
promptRenderer: undefined,
}}
Expand Down
8 changes: 6 additions & 2 deletions specs/specs/adapters/js/extras-conversationHistory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {waitForMdStreamToComplete, waitForRenderCycle} from '../../../utils/wait

describe.each([
{dataTransferMode: 'batch'},
{dataTransferMode: 'stream'},
// {dataTransferMode: 'stream'},
] satisfies Array<{ dataTransferMode: 'stream' | 'batch' }>,
)('createAiChat() + withAdapter($mode) + conversationHistory extras', ({dataTransferMode}) => {
let adapterController: AdapterController;
Expand Down Expand Up @@ -67,7 +67,11 @@ describe.each([

aiChat = createAiChat()
.withAdapter(adapterController!.adapter)
.withInitialConversation(initialConversation);
.withInitialConversation(initialConversation)
.withMessageOptions({
skipStreamingAnimation: true,
streamingAnimationSpeed: 0,
});

aiChat.mount(rootElement);
await waitForRenderCycle();
Expand Down
8 changes: 7 additions & 1 deletion specs/specs/adapters/js/prop-adapter-batchAdapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ describe('createAiChat() + withAdapter(batchAdapter)', () => {

it('Text returned from batchText should be displayed in the conversation', async () => {
// Arrange
aiChat = createAiChat().withAdapter(adapterController!.adapter);
aiChat = createAiChat()
.withAdapter(adapterController!.adapter)
.withMessageOptions({
skipStreamingAnimation: true,
streamingAnimationSpeed: 0,
});

aiChat.mount(rootElement);
await waitForRenderCycle();

Expand Down
8 changes: 7 additions & 1 deletion specs/specs/aiChat/behavior/js/markdown-batch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ describe('createAiChat() + batch adapter + markdown', () => {
describe('When markdown is being fetched', () => {
it('Should be rendered correctly', async () => {
// Arrange
aiChat = createAiChat().withAdapter(adapterController!.adapter);
aiChat = createAiChat()
.withAdapter(adapterController!.adapter)
.withMessageOptions({
skipStreamingAnimation: true,
streamingAnimationSpeed: 0,
});

aiChat.mount(rootElement);
await waitForRenderCycle();

Expand Down
8 changes: 7 additions & 1 deletion specs/specs/aiChat/behavior/react/markdown-stream.spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ describe('<AiChat /> + stream adapter + markdown', () => {
describe('When markdown is being streamed', () => {
it('Should be rendered correctly', async () => {
// Arrange
const aiChat = <AiChat adapter={adapterController!.adapter}/>;
const aiChat = (
<AiChat
adapter={adapterController!.adapter}
messageOptions={{skipStreamingAnimation: true}}
/>
);

const {container} = render(aiChat);
await waitForReactRenderCycle();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,15 @@ describe('createAiChat() + messageOptions + htmlSanitizer', () => {
// Arrange
aiChat = createAiChat().withAdapter(adapterController!.adapter).withMessageOptions({
// A dummy sanitizer that replaces all 'x' with '.'
htmlSanitizer: (html: string) => html
.toLowerCase()
.replaceAll('h', 'x')
.replaceAll('<p>', '<div>')
.replaceAll('</p>', '</div>'),
htmlSanitizer: (html: string) => {
return html
.toLowerCase()
.replaceAll('h', 'x')
.replaceAll('<p>', '<div>')
.replaceAll('</p>', '</div>');
},
skipStreamingAnimation: true,
streamingAnimationSpeed: 0,
});

aiChat.mount(rootElement);
Expand Down
Loading

0 comments on commit 1421d8a

Please sign in to comment.