Skip to content

Commit

Permalink
Remove unused function
Browse files Browse the repository at this point in the history
  • Loading branch information
Taewan-P committed Nov 23, 2024
1 parent 0dffff5 commit dea38f2
Showing 1 changed file with 16 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,15 @@ import dev.chungjungsoo.gptmobile.data.dto.anthropic.request.MessageRequest
import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorDetail
import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.ErrorResponseChunk
import dev.chungjungsoo.gptmobile.data.dto.anthropic.response.MessageResponseChunk
import io.ktor.client.call.body
import io.ktor.client.plugins.sse.sse
import io.ktor.client.request.accept
import io.ktor.client.request.headers
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.http.ContentType
import io.ktor.http.HttpMethod
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.cancel
import io.ktor.utils.io.readUTF8Line
import javax.inject.Inject
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToJsonElement

Expand All @@ -40,52 +32,29 @@ class AnthropicAPIImpl @Inject constructor(
this.apiUrl = url
}

override fun streamChatMessage(messageRequest: MessageRequest): Flow<MessageResponseChunk> {
val body = Json.encodeToJsonElement(messageRequest)

return flow<MessageResponseChunk> {
try {
networkClient()
.sse(
urlString = if (apiUrl.endsWith("/")) "${apiUrl}v1/messages" else "$apiUrl/v1/messages",
request = {
method = HttpMethod.Post
setBody(body)
accept(ContentType.Text.EventStream)
headers {
append(API_KEY_HEADER, token ?: "")
append(VERSION_HEADER, ANTHROPIC_VERSION)
}
override fun streamChatMessage(messageRequest: MessageRequest): Flow<MessageResponseChunk> = flow<MessageResponseChunk> {
try {
networkClient()
.sse(
urlString = if (apiUrl.endsWith("/")) "${apiUrl}v1/messages" else "$apiUrl/v1/messages",
request = {
method = HttpMethod.Post
setBody(Json.encodeToJsonElement(messageRequest))
accept(ContentType.Text.EventStream)
headers {
append(API_KEY_HEADER, token ?: "")
append(VERSION_HEADER, ANTHROPIC_VERSION)
}
) {
incoming.collect { event -> event.data?.let { line -> emit(Json.decodeFromString(line)) } }
}
} catch (e: Exception) {
emit(ErrorResponseChunk(error = ErrorDetail(type = "network_error", message = e.message ?: "")))
}
}
}

private suspend inline fun <reified T> FlowCollector<T>.streamEventsFrom(response: HttpResponse) {
val channel: ByteReadChannel = response.body()
try {
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
val line = channel.readUTF8Line() ?: continue
val value: T = when {
line.startsWith(STREAM_END_TOKEN) -> break
line.startsWith(STREAM_PREFIX) -> Json.decodeFromString(line.removePrefix(STREAM_PREFIX))
else -> continue
) {
incoming.collect { event -> event.data?.let { line -> emit(Json.decodeFromString(line)) } }
}
emit(value)
}
} finally {
channel.cancel()
} catch (e: Exception) {
emit(ErrorResponseChunk(error = ErrorDetail(type = "network_error", message = e.message ?: "")))
}
}

companion object {
private const val STREAM_PREFIX = "data:"
private const val STREAM_END_TOKEN = "event: message_stop"
private const val API_KEY_HEADER = "x-api-key"
private const val VERSION_HEADER = "anthropic-version"
private const val ANTHROPIC_VERSION = "2023-06-01"
Expand Down

0 comments on commit dea38f2

Please sign in to comment.