Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/streaming callbacks #389

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added FEATURE_REQUEST.md
Binary file not shown.
130 changes: 130 additions & 0 deletions callback-demo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@


const { GoogleGenerativeAI } = require("@google/generative-ai");
const { getStreamedResponse, getStreamedChatResponse } = require("./utils/streaming-callbacks");


const API_KEY = "AIzaSyDahQlerUJgwzT2n1FFx4woWRG5CNAw2-8";
const genAI = new GoogleGenerativeAI(API_KEY);

async function basicStreamingWithCallbacks() {
console.log("\n===== EXAMPLE 1: BASIC STREAMING WITH CALLBACKS =====");


const model = genAI.getGenerativeModel({ model: "gemini-1.5-flash" });

const prompt = "Write a short story about an AI assistant helping a developer";
console.log(`Prompt: ${prompt}`);

console.log("\nResponse (streaming with callbacks):");


const mockSaveToDatabase = async (text) => {
console.log(`\n[DATABASE] Saved complete response (${text.length} characters)`);
return true;
};


let chunkCount = 0;


await getStreamedResponse({
prompt,
model,
onData: (chunkText) => {

chunkCount++;
process.stdout.write(chunkText);


if (chunkCount % 5 === 0) {
process.stdout.write(` [Chunk #${chunkCount}]`);
}
},
onEnd: async (fullText) => {

console.log("\n\n[EVENT] Stream complete!");


await mockSaveToDatabase(fullText);


console.log(`[STATS] Received ${chunkCount} chunks in total`);
}
});
}

/**
* Example 2: Chat streaming with callbacks
*/
async function chatStreamingWithCallbacks() {
console.log("\n===== EXAMPLE 2: CHAT STREAMING WITH CALLBACKS =====");


const model = genAI.getGenerativeModel({ model: "gemini-1.5-flash" });


const chat = model.startChat({
history: [
{
role: "user",
parts: [{ text: "I'm developing a JavaScript application. Can you help me?" }]
},
{
role: "model",
parts: [{ text: "I'd be happy to help with your JavaScript application! What specific aspect are you working on or having trouble with?" }]
}
]
});


const message = [{ text: "I need to implement error handling for asynchronous functions. What's the best approach?" }];
console.log(`User: ${message[0].text}`);

console.log("\nAI (streaming with callbacks):");


const startTime = Date.now();

// Mock analytics tracking function
const mockTrackAnalytics = async (fullResponse, timing) => {
console.log(`\n[ANALYTICS] Response generated in ${timing}ms`);
console.log(`[ANALYTICS] Response length: ${fullResponse.length} characters`);
return true;
};

// Use the getStreamedChatResponse function
await getStreamedChatResponse({
message,
chatSession: chat,
onData: (chunkText) => {

process.stdout.write(chunkText);
},
onEnd: async (fullText) => {

const duration = Date.now() - startTime;
console.log("\n\n[EVENT] Chat response complete!");

// Track analytics when done
await mockTrackAnalytics(fullText, duration);
}
});
}

/**
* Run all examples
*/
async function runDemos() {
try {
await basicStreamingWithCallbacks();
await chatStreamingWithCallbacks();

console.log("\nAll demos completed successfully!");
} catch (error) {
console.error("Error running demos:", error);
}
}


runDemos();
104 changes: 104 additions & 0 deletions utils/streaming-callbacks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Utility functions for handling streamed responses with callbacks
*/

/**
* Process a streamed response with callback functions
*
* @param {Object} options - Configuration options
* @param {Array|String} options.prompt - The prompt to send to the model
* @param {Object} options.model - The Gemini model instance
* @param {Function} options.onData - Callback called for each chunk of data (receives the text chunk)
* @param {Function} options.onEnd - Callback called when streaming ends (receives the complete response text)
* @param {Object} options.generateOptions - Optional parameters to pass to generateContentStream
* @returns {Promise<string>} - The complete response text
*/
async function getStreamedResponse({
prompt,
model,
onData = () => {},
onEnd = () => {},
generateOptions = {}
}) {
if (!prompt) throw new Error('Prompt is required');
if (!model) throw new Error('Model is required');

const response = await model.generateContentStream(prompt, generateOptions);

let fullResponseText = '';

try {
for await (const chunk of response.stream) {
if (chunk) {
const chunkText = chunk.text();

if (typeof onData === 'function') {
onData(chunkText);
}

fullResponseText += chunkText;
}
}

if (typeof onEnd === 'function') {
await onEnd(fullResponseText);
}

return fullResponseText;
} catch (error) {
console.error('Error processing streamed response:', error);
throw error;
}
}

/**
* Process a chat streamed response with callback functions
*
* @param {Object} options - Configuration options
* @param {Array|String} options.message - The message to send to the chat session
* @param {Object} options.chatSession - The Gemini chat session instance
* @param {Function} options.onData - Callback called for each chunk of data
* @param {Function} options.onEnd - Callback called when streaming ends
* @returns {Promise<string>} - The complete response text
*/
async function getStreamedChatResponse({
message,
chatSession,
onData = () => {},
onEnd = () => {}
}) {
if (!message) throw new Error('Message is required');
if (!chatSession) throw new Error('Chat session is required');

const response = await chatSession.sendMessageStream(message);

let fullResponseText = '';

try {
for await (const chunk of response.stream) {
if (chunk) {
const chunkText = chunk.text();

if (typeof onData === 'function') {
onData(chunkText);
}

fullResponseText += chunkText;
}
}

if (typeof onEnd === 'function') {
await onEnd(fullResponseText);
}

return fullResponseText;
} catch (error) {
console.error('Error processing streamed chat response:', error);
throw error;
}
}

module.exports = {
getStreamedResponse,
getStreamedChatResponse
};