Skip to content

Commit

Permalink
feat(core): add sampling (#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Jan 30, 2025
1 parent fdc6d15 commit 544316b
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 4 deletions.
47 changes: 47 additions & 0 deletions integration-test/langfuse-integration-fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,53 @@ describe("Langfuse (fetch)", () => {
});
});

it("sampleRate can be passed as constructor arg", () => {
const sampleRate = 0.5;
const langfuse = new Langfuse({ sampleRate });

expect((langfuse as any).sampleRate).toBe(sampleRate);
});

it("sampleRate can be passed as environment variable", () => {
process.env.LANGFUSE_SAMPLE_RATE = "0.5";
const langfuse = new Langfuse();

expect((langfuse as any).sampleRate).toBe(0.5);
delete process.env.LANGFUSE_SAMPLE_RATE;
});

it("should sample trace Ids correctly", async () => {
const traceIdOutSample = "test-trace-out-sample"; // Deterministic hash: 0.92
const traceIdInSample = "test-trace-in-the-sample"; // Deterministic hash: 0.02

const langfuse = new Langfuse({ sampleRate: 0.5 });
langfuse.debug();

const inSampleTrace = langfuse.trace({ id: traceIdInSample, name: traceIdInSample });
inSampleTrace.span({ name: "span" });
inSampleTrace.generation({ name: "generation" });

const outSampleTrace = langfuse.trace({ id: traceIdOutSample, name: traceIdOutSample });
outSampleTrace.span({ name: "span" });
outSampleTrace.generation({ name: "generation" });

await langfuse.flushAsync();

expect(
(await getAxiosClient()).get(`${LANGFUSE_BASEURL}/api/public/traces/${traceIdOutSample}`, {
headers: getHeaders(),
})
).rejects.toThrow();

const fetchedInSampleTrace = await (
await getAxiosClient()
).get(`${LANGFUSE_BASEURL}/api/public/traces/${traceIdInSample}`, {
headers: getHeaders(),
});

expect(fetchedInSampleTrace.data.id).toBe(traceIdInSample);
}, 10_000);

it("should mask data in the event body", async () => {
const mask = ({ data }: { data: any }): string =>
typeof data === "string" && data.includes("confidential") ? "MASKED" : data;
Expand Down
29 changes: 27 additions & 2 deletions langfuse-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import {
type UpdateLangfuseGenerationBody,
type UpdateLangfuseSpanBody,
type GetMediaResponse,
UpdatePromptBody,
type UpdatePromptBody,
} from "./types";
import { LangfuseMedia, type LangfuseMediaResolveMediaReferencesParams } from "./media/LangfuseMedia";
import {
Expand All @@ -69,6 +69,7 @@ import {
safeSetTimeout,
type RetriableOptions,
} from "./utils";
import { isInSample } from "./sampling";

export * from "./prompts/promptClients";
export * from "./media/LangfuseMedia";
Expand Down Expand Up @@ -189,6 +190,7 @@ abstract class LangfuseCoreStateless {
private localEventExportMap: Map<string, SingleIngestionEvent[]> = new Map();
private projectId: string | undefined;
private mask: MaskFunction | undefined;
private sampleRate: number | undefined;

// internal
protected _events = new SimpleEventEmitter();
Expand Down Expand Up @@ -216,6 +218,12 @@ abstract class LangfuseCoreStateless {
this.flushInterval = options?.flushInterval ?? 10000;
this.release = options?.release ?? getEnv("LANGFUSE_RELEASE") ?? getCommonReleaseEnvs() ?? undefined;
this.mask = options?.mask;
this.sampleRate =
options?.sampleRate ?? (getEnv("LANGFUSE_SAMPLE_RATE") ? Number(getEnv("LANGFUSE_SAMPLE_RATE")) : undefined);

if (this.sampleRate) {
this._events.emit("debug", `Langfuse trace sampling enabled with sampleRate ${this.sampleRate}.`);
}

this._retryOptions = {
retryCount: options?.fetchRetryCount ?? 3,
Expand Down Expand Up @@ -614,6 +622,19 @@ abstract class LangfuseCoreStateless {
return;
}

// Sampling
const traceId = this.parseTraceId(type, body);
if (!traceId) {
this._events.emit(
"warning",
"Failed to parse traceID for sampling. Please open a Github issue in https://github.com/langfuse/langfuse/issues/new/choose"
);
} else if (!isInSample(traceId, this.sampleRate)) {
this._events.emit("debug", `Event with trace ID ${traceId} is out of sample. Skipping.`);

return;
}

const promise = this.processEnqueueEvent(type, body);
const promiseId = generateUUID();
this.pendingEventProcessingPromises[promiseId] = promise;
Expand Down Expand Up @@ -734,7 +755,7 @@ abstract class LangfuseCoreStateless {
return;
}

const traceId = "traceId" in body ? body.traceId : type.includes("trace") ? body.id : undefined;
const traceId = this.parseTraceId(type, body);

if (!traceId) {
this._events.emit("warning", "traceId is required for media upload");
Expand All @@ -760,6 +781,10 @@ abstract class LangfuseCoreStateless {
);
}

protected parseTraceId(type: LangfuseObject, body: EventBody): string | null | undefined {
return "traceId" in body ? body.traceId : type.includes("trace") ? body.id : undefined;
}

protected async findAndProcessMedia({
data,
traceId,
Expand Down
4 changes: 2 additions & 2 deletions langfuse-core/src/openapi/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,10 @@ export interface components {
*/
ObservationLevel: "DEBUG" | "DEFAULT" | "WARNING" | "ERROR";
/** MapValue */
MapValue: (string | null) | (number | null) | (boolean | null) | (string[] | null) | undefined; /**
MapValue: (string | null) | (number | null) | (boolean | null) | (string[] | null) | undefined /**
* CommentObjectType
* @enum {string}
*/
*/;
CommentObjectType: "TRACE" | "OBSERVATION" | "SESSION" | "PROMPT";
/**
* DatasetStatus
Expand Down
39 changes: 39 additions & 0 deletions langfuse-core/src/sampling.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* A consistent sampling function that works for arbitrary strings across all JavaScript runtimes.
*/
export function isInSample(input: string, sampleRate: number | undefined): boolean {
if (sampleRate === undefined) {
return true;
} else if (sampleRate === 0) {
return false;
}

if (sampleRate < 0 || sampleRate > 1 || isNaN(sampleRate)) {
console.warn("Sample rate must be between 0 and 1. Ignoring setting.");

return true;
}

return simpleHash(input) < sampleRate;
}

/**
* Simple and consistent string hashing function.
* Uses character codes and prime numbers for good distribution.
*/
function simpleHash(str: string): number {
let hash = 0;
const prime = 31;

for (let i = 0; i < str.length; i++) {
// Use rolling multiplication instead of Math.pow
hash = (hash * prime + str.charCodeAt(i)) >>> 0;
}

// Use bit operations for better distribution
hash = ((hash >>> 16) ^ hash) * 0x45d9f3b;
hash = ((hash >>> 16) ^ hash) * 0x45d9f3b;
hash = (hash >>> 16) ^ hash;

return Math.abs(hash) / 0x7fffffff;
}
2 changes: 2 additions & 0 deletions langfuse-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export type LangfuseCoreOptions = {
enabled?: boolean;
// Mask function to mask data in the event body
mask?: MaskFunction;
// Trace sampling rate. Approx. sampleRate % traces will be sent to LF servers
sampleRate?: number;
// Project ID to use for the SDK in admin mode. This should never be set by users.
_projectId?: string;
// Whether to enable local event export. Defaults to false.
Expand Down
170 changes: 170 additions & 0 deletions langfuse-core/test/langfuse.sampling.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { isInSample } from "../src/sampling";

describe("isInSample", () => {
describe("edge cases", () => {
it("should always return true when sample rate is undefined", () => {
expect(isInSample("test-string", undefined)).toBe(true);
expect(isInSample("", undefined)).toBe(true);
expect(isInSample("12345", undefined)).toBe(true);
});

it("should always return false when sample rate is 0", () => {
expect(isInSample("test-string", 0)).toBe(false);
expect(isInSample("", 0)).toBe(false);
expect(isInSample("12345", 0)).toBe(false);
});

it("should always return true when sample rate is 1", () => {
expect(isInSample("test-string", 1)).toBe(true);
expect(isInSample("", 1)).toBe(true);
expect(isInSample("12345", 1)).toBe(true);
});

it("should log warning and return true for invalid sample rates", () => {
const consoleSpy = jest.spyOn(console, "warn").mockImplementation();

expect(isInSample("test", -0.5)).toBe(true);
expect(consoleSpy).toHaveBeenCalled();

consoleSpy.mockClear();
expect(isInSample("test", 1.5)).toBe(true);
expect(consoleSpy).toHaveBeenCalled();

consoleSpy.mockRestore();
});
});

describe("consistency", () => {
it("should return consistent results for the same input", () => {
const testString = "test-consistency";
const sampleRate = 0.5;
const firstResult = isInSample(testString, sampleRate);

for (let i = 0; i < 100; i++) {
expect(isInSample(testString, sampleRate)).toBe(firstResult);
}
});

it("should return different results for different strings", () => {
const sampleRate = 0.5;
const results = new Set();

["test1", "test2", "test3", "different", "strings"].forEach((str) => {
results.add(isInSample(str, sampleRate));
});

expect(results.size).toBeGreaterThan(1);
});
});

describe("distribution", () => {
it("should roughly match the sample rate for large numbers of inputs", () => {
const sampleRate = 0.3;
let inSampleCount = 0;
const totalTests = 10_000;

// Generate random strings and count how many are in sample
for (let i = 0; i < totalTests; i++) {
const testString = `test-${Math.random()}-${i}`;
if (isInSample(testString, sampleRate)) {
inSampleCount++;
}
}

const actualRate = inSampleCount / totalTests;
// Allow for 5% deviation from expected rate
expect(Math.abs(actualRate - sampleRate)).toBeLessThan(0.05);
});
});

describe("special inputs", () => {
it("should handle empty strings", () => {
const sampleRate = 0.5;
// Just verify it doesn't throw and returns a boolean
const result = isInSample("", sampleRate);
expect(typeof result).toBe("boolean");
});

it("should handle special characters", () => {
const sampleRate = 0.5;
const specialChars = "!@#$%^&*()_+-=[]{}|;:,.<>?`~";
// Just verify it doesn't throw and returns a boolean
const result = isInSample(specialChars, sampleRate);
expect(typeof result).toBe("boolean");
});

it("should handle very long strings", () => {
const sampleRate = 0.5;
const longString = "a".repeat(10000);
// Just verify it doesn't throw and returns a boolean
const result = isInSample(longString, sampleRate);
expect(typeof result).toBe("boolean");
});

it("should handle unicode characters", () => {
const sampleRate = 0.5;
const unicodeString = "你好世界😀🌍👋";
// Just verify it doesn't throw and returns a boolean
const result = isInSample(unicodeString, sampleRate);
expect(typeof result).toBe("boolean");
});
});

// UUID specific tests
describe("UUID handling", () => {
it("should maintain consistent sampling across different UUID versions", () => {
const sampleRate = 0.5;
const uuidV4 = "123e4567-e89b-12d3-a456-426614174000";
const uuidV1 = "123e4567-e89b-11d3-a456-426614174000";

// Store initial results
const v4Result = isInSample(uuidV4, sampleRate);
const v1Result = isInSample(uuidV1, sampleRate);

// Test consistency 100 times
for (let i = 0; i < 100; i++) {
expect(isInSample(uuidV4, sampleRate)).toBe(v4Result);
expect(isInSample(uuidV1, sampleRate)).toBe(v1Result);
}
});

it("should handle sequential UUIDs appropriately", () => {
const sampleRate = 0.5;
const results = new Set();

// Generate 1000 sequential UUIDs
for (let i = 0; i < 1000; i++) {
const sequentialUUID = `00000000-0000-4000-8000-${i.toString().padStart(12, "0")}`;
results.add(isInSample(sequentialUUID, sampleRate));
}

// Ensure we got both true and false results
expect(results.size).toBe(2);
});
});

describe("performance", () => {
// Helper to measure execution time
const measureExecutionTime = (fn: () => void): number => {
const start = performance.now();
fn();
return performance.now() - start;
};

it("should handle high-frequency calls efficiently", () => {
const sampleRate = 0.5;
const shortString = "test-string";
const iterations = 100_000;

const executionTime = measureExecutionTime(() => {
for (let i = 0; i < iterations; i++) {
isInSample(shortString, sampleRate);
}
});

// Average time per call should be less than 0.01ms (10 microseconds)
const avgTimePerCall = executionTime / iterations;
expect(avgTimePerCall).toBeLessThan(0.01);
});
});
});

0 comments on commit 544316b

Please sign in to comment.