Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared state conceptual docs #1958

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/docs/concepts/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)
```

Persistence is critical sustaining a long-running chat sessions. For example, a chat between a user and an AI assistant may have interruptions. Persistence ensures that a user can continue that particular chat session at any later point in time. However, what happens if a user initiates a new chat session with an assistant? This spawns a new thread, and the information from the previous session (thread) is not retained. This motivates the need for a memory service that can maintain data across chat sessions (threads).
Persistence is critical sustaining a long-running chat sessions. For example, a chat between a user and an AI assistant may have interruptions. Persistence ensures that a user can continue that particular chat session at any later point in time. However, what happens if a user initiates a new chat session with an assistant? This spawns a new thread, and the information from the previous session (thread) is not retained. This motivates the need for a memory that can maintain data across chat sessions (threads).
hwchase17 marked this conversation as resolved.
Show resolved Hide resolved

For this, we can use LangGraph's `Store` interface to save and retrieve information across threads. Shared information can be namespaced by, for example, `user_id` to retain user-specific information across threads. See more detail in the [persistence conceptual guide](https://langchain-ai.github.io/langgraph/concepts/persistence/#persistence) and this [how-to guide on shared state](../how-tos/memory/shared-state.ipynb).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have a code snippet for the checkpointer, i would do the same for here

also, now that we have this, i would add more concrete examples for the two below

also id maybe rename "metaprompting" to "update own instructions" or something?


## Meta-prompting

Expand Down
100 changes: 100 additions & 0 deletions docs/docs/concepts/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,106 @@ The final thing you can optionally specify when calling `update_state` is `as_no

![Update](img/persistence/checkpoints_full_story.jpg)

## Memory Store
hwchase17 marked this conversation as resolved.
Show resolved Hide resolved

![Update](img/persistence/shared_state.png)

A state schema specifies a set of keys / channels that are populated as a graph is executed. As discussed above, state can be written by a checkpointer to a thread at each graph step, enabling state persistence.
hwchase17 marked this conversation as resolved.
Show resolved Hide resolved
hwchase17 marked this conversation as resolved.
Show resolved Hide resolved

But, what if we want to retrain some information *across threads*? Consider the case of a chatbot where we want to retain specific information about the user across *all* chat conversations (e.g., threads) with that user!

With checkpointers alone, we cannot share information across threads. This motivates the need for the `Store` interface. As an illustration, we can define an `InMemoryStore` to store information about a user across threads. We simply compile our graph with a checkpointer, as before, and will our new in_memory_store.
hwchase17 marked this conversation as resolved.
Show resolved Hide resolved

```python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore

# We need this because we want to enable threads (conversations)
checkpointer = MemorySaver()

# This is the in memory store needed to save the memories (i.e. user preferences) across threads
in_memory_store = InMemoryStore()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
```

Now, let's assume that we want to store memories for each user. We invoke the graph with a `thread_id`, as before, and also with a `user_id`, which we'll use to namespace our memories to this particular user.

```python
# Invoke the graph
config = {"configurable": {"thread_id": "1", "user_id": "1"}}

# First let's just say hi to the AI
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
print(update)
```

Our memory store is accessible in *any node*. We simply pass `store: BaseStore` to the node as an argument and we can access the store from there. We can write to it using `store.put`. We want to write memories associated with the user, which is specified by `user_id` in the config.

Each memory is namespaced by a `tuple`, which in our case will be `("memories", <user_id>)`. And we call `store.put` with a key, value pair. The key is UUID for the memory the value is a dictionary with whatever we want to store.

```python
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

# Get the user id from the config
user_id = config["configurable"]["user_id"]

# Create a new memory ID
memory_id = str(uuid.uuid4())

# ... Analyze conversation and create a new memory

# We create a new memory
store.put(("memories", user_id), memory_id, {
"memory": memory,
})

```

So we've written memories namespaced to `("memories", <user_id>)`. We can read them back using `store.search`, which will return all memories for a given user as a list.

```python
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

# Get the user id from the config
user_id = config["configurable"]["user_id"]

# Get the memories for the user from the store
memories = store.search(("memories", user_id))
info = "\n".join([d.value["memory"] for d in memories])

# ... Use memories in the model call
```

If we create a new thread, we can still access the same memories so long as the user_id is the same.

```python
# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# Let's say hi again
for update in graph.stream(
{"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input into the graph is messages in the format of dicts.

I think that the schema of the graph is MessagesState -- which doesn't support the dict notation probably?

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
AnyMessage = Annotated[
    Union[
        Annotated[AIMessage, Tag(tag="ai")],
        Annotated[HumanMessage, Tag(tag="human")],
        Annotated[ChatMessage, Tag(tag="chat")],
        Annotated[SystemMessage, Tag(tag="system")],
        Annotated[FunctionMessage, Tag(tag="function")],
        Annotated[ToolMessage, Tag(tag="tool")],
        Annotated[AIMessageChunk, Tag(tag="AIMessageChunk")],
        Annotated[HumanMessageChunk, Tag(tag="HumanMessageChunk")],
        Annotated[ChatMessageChunk, Tag(tag="ChatMessageChunk")],
        Annotated[SystemMessageChunk, Tag(tag="SystemMessageChunk")],
        Annotated[FunctionMessageChunk, Tag(tag="FunctionMessageChunk")],
        Annotated[ToolMessageChunk, Tag(tag="ToolMessageChunk")],
    ],
    Field(discriminator=Discriminator(_get_type)),
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @vbarda / @hinthornw IMO this is a problem if we start using dict notation

Copy link
Contributor

@hinthornw hinthornw Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran into mypy anger from this type of thing where BaseMessgae isn't included either.

I think we can use from langgraph.graph.mesages import Messages which is a union of that and "messagelikedict" or something

It's frustrating that core doesn't have a single type for this

):
print(update)
```

In addition, we can always access the store outside of the graph execution.

```python
for memory in in_memory_store.search(("memories", "1")):
print(memory.value)
```

When we use the LangGraph API, either locally (e.g., in LangGraph Studio) or with LangGraph Cloud, the memory store is available to use by default and does not need to be specified during graph compilation.

See our [how-to guide on shared state](../how-tos/memory/shared-state.ipynb) for a detailed example!.

## Checkpointer libraries

Under the hood, checkpointing is powered by checkpointer objects that conform to [BaseCheckpointSaver][langgraph.checkpoint.base.BaseCheckpointSaver] interface. LangGraph provides several checkpointer implementations, all implemented via standalone, installable libraries:
Expand Down
Loading