Building Complex Multi-Agent Teams and Setups with LangGraph

4.5/5 - (2 votes)

๐Ÿ’ก Info: This course is a complete text tutorial. It’s based on our academy course. If you’re interested in video explainers, check out the course here.

Hi and welcome to this course on building complex multi-agent teams and setups using LangGraph, LangChain, and LangSmith. In this course we’ll start from the ground up using LangChain, and then build and build, adding more complexity and tools as we go along. We will learn how to build a graph with paths, conditional paths, teams, team managers, and more, all stringing our agents together in powerful ways.

  • In part 1, we’ll get started with the basics of LangChain, learning how to create prompt templates and Chains, working with the LangChain syntax to easily string together our LLM calls.
  • In the next part we’ll learn how to write tools so that we can make our future agents powerful by giving them functions they can call. We will use the newest LangChain syntax for this and create both an image generation tool and a weather tool.
  • Part 3 is where we will learn the basics of LangGraph, covering the underlying concepts and exactly how it works. We will learn by setting up our first agent and graph which can return a visual representation of the current weather in any city you name.
  • In part 4 we’ll look at how we can take this all to yet the next level, discussing how we can use all of this to create a whole team of agents working together for us. We’ll also write a tool that can output PDF files in preparation for our multi-agent setup.
  • Part 5 is where the rubber really hits the road and we will create a powerful multi-agent setup in LangGraph using a team, team manager, many agents, conditional paths, and more. We will create a team that can work together independently and create travel itineraries for us, providing them in PDF format with an inserted image and a full travel plan.
  • In the final part we’ll have a look at writing asynchronous tools for our agents and then create a web research and article writing graph that can visit many web pages at the same time and then write an article about our desired topic for us.

I hope you’re as excited as I am to get started. Let’s dive in!

LangChain, LangSmith and LangGraph

Hi and welcome to this course on LangGraph, LangChain, and LangSmith. My name is Dirk van Meerveld and I will be your host and guide as we go on this exploration together.

So what is up with all these Lang-words? Well, in short:

  • LangChain is a basic framework that will allow us to work with LLMs.
  • LangGraph will allow us to make more complex combinations using LangChain by introducing graph structures, where we can have multiple nodes or even teams of LLM agents working together.
  • LangSmith is a tool that helps us see exactly what is going on while we work with the above two, to help us debug and improve our code in a more convenient way.

LangChain

Let’s get started with LangChain๐Ÿ”— first. Langchain is a framework designed to make it easier to build applications that use large language models (LLMs). Think of it as a set of tools that helps bridge the gap between LLMs and the applications you might want to build with them.

LangChain helps us:

  • Provide a unified interface: Any code you write can be used with different LLMs with little modification, and you can use the same code to write prompts or tools for different LLMs.
  • Prebuilt tools for common tasks: Langchain includes tools for common tasks you might want to do with LLMs, such as building chatbots, summarizing documents, or analyzing code. Besides just building our own tools and functions, we can also import community pre-built tools.
  • Memory and Context: Langchain makes it easy to incorporate memory and context into our LLM applications. This means our application can remember past interactions and use that information to inform future responses.

So let’s get started! First go ahead and create a new project folder and name it whatever you like, I’ll call mine FINX_LANGGRAPH:

๐Ÿ“‚ FINX_LANGGRAPH

Create a venv in the root project folder

We’ll be running this project inside a virtual environment. A virtual environment is a self-contained directory that will allow us to install specific versions of packages inside the virtual environment without affecting the global Python installation.

We will use this as I will be using specific versions for the libraries we install as we go along, and I want to make sure that you have the exact same experience as I do.

For example, when we use pydantic we’ll be using the older V1 for this project, as it plays nicely with LangChain. You’ll probably have V2 installed on your system-wide Python installation, and then your imports will be different from mine, causing confusion. We also don’t want to mess with your system-wide Python installation.

The virtual environment will make it easy for you to install my exact versions without worrying about affecting any of your other projects and is a good practice to follow in general.

To create a new virtual environment we’ll use a tool called pipenv. If you don’t have pipenv installed, you can install it using pip, which is Python’s package manager. Run the following command in your terminal:

pip install pipenv

Make sure the terminal is inside your root project folder, e.g. /c/Coding_Vault/Finx_Fine_Tuning, and then run the following command to create a new virtual environment:

pipenv shell

This will create a new virtual environment and also a Pipfile in your project directory. Any packages you install using pipenv install will be added to the Pipfile.

  1. To generate a Pipfile.lock, which is used to produce deterministic builds, run:
pipenv lock

This will create a Pipfile.lock in your project directory, which contains the exact version of each dependency to ensure that future installs are able to replicate the same environment.

We don’t need to install a library first to create a Pipfile.lock. From now on when we install a library in this virtual environment with pipenv install library_name, they will be added to the Pipfile and Pipfile.lock, which are basically just text files keeping track of our exact project dependencies.

For reference, I’m using Python 3.10 for this project, but you should be fine with any recent version. Consider upgrading if you’re using an older version.

Basic project setup

Before we get started, we need to make sure we have our OpenAI API key ready to load in a convenient way, we cannot hardcode this one in our source code. Go to https://platform.openai.com/api-keys and copy your API key, or make a new one. You’ll only pay for what you use which will be cents if you just play around with it casually. Then create a new file called .env in the root folder of your project:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“„ .env             โœจNew file
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock

And paste your API key in the .env file like this, making sure not to use any spaces or quotes:

OPENAI_API_KEY=your_api_key_here

Then go ahead and save and close this file. If you are using Git, make sure to add this file to your .gitignore file so you don’t accidentally commit your API key to your repository. If you’re not using Git, just make sure you exclude the .env file if you share your code with anyone.

We’ll be using several API keys and settings across our project, adding more as we go, so let’s create a simple and reusable way to load them to stop us from writing the same code over and over again.

Run the following command in your terminal to add the python-decouple package inside your pipenv environment:

pipenv install python-decouple==3.7

We will use this package to read the .env file and get the API key from it. Now create a new file named setup_environment.py in the root folder of your project:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“„ .env
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py  โœจNew file

Then inside this new setup_environment.py file, write the following code:

import os

from decouple import config


def set_environment_variables() -> None:
    os.environ["OPENAI_API_KEY"] = str(config("OPENAI_API_KEY"))

We import the os and config from the decouple package we just installed a minute ago. We then create a function we can import from our other code files.

The config("OPENAI_API_KEY") function reads the .env file and gets the value of the OPENAI_API_KEY variable we set in there, so make sure you have used the exact same name in there. The str() cast just makes sure it’s a string value. We then set this value to the OPENAI_API_KEY environment variable using os.environ.

This way we can just use LangChain freely without having to worry about our API key as both LangChain and OpenAI are set up to read our API keys from the environment variables automatically.

LangChain basics

Ok, time to get started with LangChain! Let’s cover the basics first so we understand the building blocks. We’ll start with some installs. Make sure you run all of these even if you have some of these libraries installed already as we’re not using the global Python installation but our virtual environment. Run the following command in your terminal:

pipenv install openai==1.14.2 langchain==0.1.13 langchain-openai==0.1.0

The openai library will work with the OpenAI API behind the scenes while we use langchain and the langchain-openai library has some functionality that overlaps both.

Now create a new file named langchain_basics.py in the root folder of your project:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py  โœจNew file
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py

Inside this new langchain_basics.py file, let’s get started with the following imports:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from setup_environment import set_environment_variables

Before we explain the imports, I want to cover a potential problem you may have here. You may have the following problem where the imports are not recognized and have red squiggly lines under them even though you just installed these libraries:

So what is going on here? Well, the virtual environment we created comes with its own Python interpreter, and the Python interpreter in your code editor is probably set to the system-wide Python interpreter. This means that the code editor doesn’t know where to find the libraries we just installed in the virtual environment.

To fix this, press Ctrl+Shift+P in VS Code to open the command palette, then type Python: Select Interpreter and select the Python interpreter from the virtual environment you created. You can find the correct one easily by comparing your root project name with the interpreter name. My root folder is FINX_LANGGRAPH, so I can find mine in the list under this name:

When you click this the red squiggly lines should go away and you’re now using the correct Python interpreter.

With that out of the way, let’s look at the imports here:

  • StrOutputParser is a class that will help us parse the output from the LLMs into a string format. Normally when you get the return from ChatGPT, we have to index into the response.choices[0].message.content to get the response. Just think of this as a convenience class that will help us with this.
  • ChatPromptTemplate is a class that will help us create a template for our chat prompts. This will make it easier to create prompts for the LLMs.
  • ChatOpenAI is a class that will basically just allow us to create an instance of OpenAI and use it with LangChain.

The value here of these output parsers and prompt templates is that they are a unified interface that we can use in the same manner without changes even if we change the LLM we are using halfway through our project or in the future.

Prompt templates

We then import the set_environment_variables function from the setup_environment file we created earlier. Now let’s continue our code by creating a prompt template:

set_environment_variables()


french_german_prompt = ChatPromptTemplate.from_template(
    "Please tell me the french and german words for {word} with an example sentence for each."
)

First, we make sure to call our set_environment_variables function to set our API key. As a simple example prompt, I’ll create an example that asks for the French and German words for a given word, along with an example sentence for each. This is just a simple example to show the parts of LangChain before we get into more complex examples.

The {word} part is the template variable that we can replace with any word we want to ask about. We then create a ChatPromptTemplate instance using the from_template method and pass in our prompt string. The ChatPromptTemplate class will help us create prompts for the LLMs in a more convenient way and basically deals with formatting message history like this:

## Example of a ChatPromptTemplate
template = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful AI bot. Your name is {name}."),
            ("human", "Hello, how are you doing?"),
            ("ai", "I'm doing well, thanks!"),
            ("human", "{user_input}"),
        ])

We need only a single message here though, which is why we use the from_template method. In this case, LangChain will assume this to be a human message so this will result in:

template = ChatPromptTemplate.from_messages([
            ("human", "Please tell me the french and german words for {word} with an example sentence for each.")
        ])

Creating a chain

Now that we have a prompt template to create our prompts, let’s continue:

llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
output_parser = StrOutputParser()

french_german_chain = french_german_prompt | llm | output_parser

First, we define our LLM instance using the ChatOpenAI class and pass in the model we want to use. I’ll be using gpt-3.5-turbo-0125 as it is more than enough for the simple test we’re doing here. If at any part in the course you want to use GPT-4-turbo instead then feel free to do so.

We’ve already set the API key to the environment variable so we don’t need to worry about it. We then create an instance of the StrOutputParser class to parse the output from the LLMs into a string response as discussed earlier.

Now that we have three building blocks, it is time for one of LangChain’s important concepts, “chains”. We can simply use the | operator to chain these building blocks together. This operator is taken from the pipe operator in Unix, which is used to chain commands together.

In this case, we take the french_german_prompt as the entry point of our chain, and we pipe the resulting prompt into our llm, making an LLM call. We then pipe the output into our output_parser to get the string response. Notice how easy and readable the chain is. We use chains to build stuff with large language models, hence the name LangChain. This piping style of syntax above is often referred to as LCEL or LangChain Expression Language.

Running the chain

Now let’s actually try and run this chain. To do this we can simply use the invoke method on our chain:

result = french_german_chain.invoke({"word": "polar bear"})
print(result)

We can technically also just pass in the string "polar bear" as we only have a single variable, but it’s better practice to use a dictionary like this as you may have multiple variables in your prompt. So go ahead and run this Python file and you should get something like the following:

French: ours polaire
German: Eisbรคr

Example sentence in French: L'ours polaire est un animal emblรฉmatique de l'Arctique.
Example sentence in German: Der Eisbรคr ist das grรถรŸte an Land lebende Raubtier der Welt.

The order or structure may be slightly different as we didn’t specify any specific desired output structure, but that’s not the point here, it works! You’ll notice LangChain is very easy to read and understand, and this exact same code can be used with other LLMs with little modification.

We can also very easily stream the response instead. Edit your code like this, commenting out the previous invoke call and calling stream instead:

# result = french_german_chain.invoke({"word": "polar bear"})
# print(result)

for chunk in french_german_chain.stream({"word": "polar bear"}):
    print(chunk, end="", flush=True)

So for every chunk in the stream that results from calling french_german_chain.stream with the word “polar bear”, we print the chunk to the console. The end="" and flush=True are just to make sure there are no line breaks in between print messages and that the output is printed immediately to the console.

Now if you run it again, you’ll see the tokens being streamed and written to your console in real time.

Another useful method provided for us is batch, so let’s give that a spin as well:

# for chunk in french_german_chain.stream({"word": "polar bear"}):
#     print(chunk, end="", flush=True)

print(
    french_german_chain.batch(
        [{"word": "computer"}, {"word": "elephant"}, {"word": "carrot"}]
    )
)

This time we pass in a list of dictionaries with one entry for each run in the batch. Running this will give the responses in a list, one for each entry in the batch:

["French: \nComputer - Ordinateur \nExample sentence: J'utilise mon ordinateur pour travailler et regarder des films.\n\nGerman:\nComputer - Computer \nExample sentence: Mein Computer ist schon ein paar Jahre alt, aber er funktioniert immer noch einwandfrei.", "French: รฉlรฉphant\nExample sentence: J'ai vu un รฉlรฉphant au zoo.\n\nGerman: Elefant\nExample sentence: Der Elefant im Zoo war sehr groรŸ.", "French: carotte\nExample sentence: J'ai achetรฉ des carottes pour faire une soupe.\n\nGerman: Karotte\nExample sentence: Ich esse gerne Karotten als Snack."]

Now go ahead and comment that one out as well and let’s check the properties of our chain:

# print(
#     french_german_chain.batch(
#         [{"word": "computer"}, {"word": "elephant"}, {"word": "carrot"}]
#     )
# )

print("input_schema", french_german_chain.input_schema.schema())
print("output_schema", french_german_chain.output_schema.schema())

And if we run that we get a JSON schema that shows the in and outputs of our chain:

input_schema {'title': 'PromptInput', 'type': 'object', 'properties': {'word': {'title': 'Word', 'type': 'string'}}}
output_schema {'title': 'StrOutputParserOutput', 'type': 'string'}

We can see that the input takes a single object variable that needs to have a key word with a string value. If we add more variables to our prompt, we’ll see them in the schema as well. The output schema is a simple string because we used the StrOutputParser to parse the output into a string in the end.

Adding complexity

That is the basics of an extremely simple chain in LangChain. So let’s make it a bit more complex here. In this same file let’s declare a second chain and let’s say for the sake of a simple demonstration that this second chain is supposed to check if the output of the first chain is correct or not. (We’re just using simple examples here to save time and get to the good stuff faster).

So down below the other stuff in the langchain_basics.py file, let’s define the prompt template for our second chain:

# print("input_schema", french_german_chain.input_schema.schema())
# print("output_schema", french_german_chain.output_schema.schema())


check_if_correct_prompt = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant that looks at a question and its given answer. You will find out what is wrong with the answer and improve it. You will return the improved version of the answer.
    Question:\n{question}\nAnswer Given:\n{initial_answer}\nReview the answer and give me an improved version instead.
    Improved answer:
    """
)

This time we have two variables in our prompt, question and initial_answer. We ask it to give an improved version of the first answer. The first answer is likely to be perfect already but again this is just for the sake of a quick demonstration.

We can reuse the llm and output_parser instances we created earlier, so let’s just create a new chain with the new prompt:

check_answer_chain = check_if_correct_prompt | llm | output_parser

Now we will need to run the input through the first chain, and then we need to keep both the original prompt from the first chain and the answer we get back from the first chain to pass them into the second one. So let’s do that:

def run_chain(word: str) -> str:
    initial_answer = french_german_chain.invoke({"word": word})
    print("initial answer:", initial_answer, end="\n\n")
    answer = check_answer_chain.invoke(
        {
            "question": f"Please tell me the french and german words for {word} with an example sentence for each.",
            "initial_answer": initial_answer,
        }
    )
    print("improved answer:", answer)
    return answer

So we define a function run_chain that takes a word as string input and will return a string. The initial answer is our return after we invoke the french_german_chain with the word.

We then print this answer and pass it into the check_answer_chain along with the original prompt, by passing both through a dictionary with the appropriate keys matching our prompt template. We print the improved answer and return it.

Now let’s run this function with a word:

run_chain("strawberries")

I apologize if I suddenly gave you a craving for strawberries! ๐Ÿ“๐Ÿ“๐Ÿ“ Run it and your output will be something like this:

initial answer: French: fraises
Example sentence: J'adore manger des fraises en รฉtรฉ.

German: Erdbeeren
Example sentence: Im Sommer esse ich gerne Erdbeeren mit Sahne.

improved answer: French: fraises
Example sentence: J'adore manger des fraises en รฉtรฉ.

German: Erdbeeren
Example sentence: Im Sommer esse ich gerne Erdbeeren.

Now of course both of them are fine and there wasn’t really anything to improve as the question is very simple, but we successfully ran a chain through another chain.

So that works fine, but you can see passing the values around to the second chain is a bit cumbersome. Now imagine we want to add a 3rd step to the chains above or even a 4th one. A conditional split path perhaps? If x then call chain a and else call chain b.

Using the above method would be a bit of a mess, so we’d have to create some kind of state object instead that has all the data in a single object so that we can pass this around between chains, with each chain adding or modifying the state object as needed.

This is actually a pretty good solution to the problem and as it happens, this is pretty much what LangGraph will do for us. Before we get there though, we need to take a short detour to LangSmith and also learn how to write our own tools in LangChain so we can use the power of function calling and agents to fully leverage the power of LangGraph and create some really cool stuff. That’s it for part 1 of this course, I hope you enjoyed it and I’ll see you in the next one!

LangSmith and Writing Tools

Hi and welcome back to part 2 of the tutorial series where we will be having a look at LangSmith which will help us debug our LLM creations and also write tools that our powerful agents will be able to execute from part 3 onwards.

LangSmith setup

So what is LangSmith? LangSmith is another part of the LangChain ecosystem that will help us during the development and debugging of our LLM applications

  • LLM Debugging and Testing: It will make it easier to identify and fix errors and test our applications to ensure they work as expected.
  • Monitoring and Evaluation: It also provides tools to monitor performance and effectiveness, especially helpful if your project needs fast response times.
  • Easy integration: LangSmith integrates seamlessly with LangChain and is very easy to set up as you will see.

First we’ll need to get an API key for LangSmith, so it can keep track of our traces for us using our unique identifier. This is free for single-user accounts with up to 3000 traces per month, which is more than enough for general development and testing. You shouldn’t have to provide any payment details unless you want to switch to a heavier plan later on.

Go to https://smith.langchain.com/ and sign up using your GitHub, Google, or email address:

After you have made your account and logged in at smith.langchain.com find the โš™๏ธgear icon in the bottom left corner and click it, then find the Create Api Key button to generate your API key:

Copy your API key and then let’s open our existing .env file in the root of our project and edit it by adding the LangSmith API key (no spaces or quotation marks):

OPENAI_API_KEY=your_api_key_here
LANGCHAIN_API_KEY=your_api_key_here

Save and close your .env file. We don’t need to install LangSmith as it is already included in the LangChain package. Let’s move on to our existing setup_environment.py file to add the LangSmith setup to our reusable setup script.

In order to enable LangSmith tracing, we need to do three things.

  • Provide the LangSmith API key
  • Set the tracing environment variable to true
  • Set the project name so we can distinguish between different projects in our LangSmith dashboard

Replace all the code so far in the setup_environment.py file with the following:

import os
from datetime import date

from decouple import config


def set_environment_variables(project_name: str = "") -> None:
    if not project_name:
        project_name = f"Test_{date.today()}"

    os.environ["OPENAI_API_KEY"] = str(config("OPENAI_API_KEY"))

    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_API_KEY"] = str(config("LANGCHAIN_API_KEY"))
    os.environ["LANGCHAIN_PROJECT"] = project_name

    print("API Keys loaded and tracing set with project name: ", project_name)

We added the date from datetime import so we can use the date as the project name. Then we added an argument project_name to the function so we can set a custom project name for the LangChain dashboard. If no project name is provided, it will default to Test_{date.today()} so we still have something to distinguish it by even if we forget to set the name.

The OPENAI_API_KEY environment variable was already there, but now we have added three more environment variables for LangSmith. LANGCHAIN_TRACING_V2 enables LangSmith tracing when set to true, and then we have the LANGCHAIN_API_KEY and LANGCHAIN_PROJECT environment variables which LangSmith will read to know who we are and group the traces per project in our dashboard.

Make sure you use the exact same names for the environment variables. Save and close the file. Now let’s see what LangSmith will do for us by giving it a test run. Open the langchain_basics.py file that we created in part 1 and change only the following line:

set_environment_variables()

to add a project name:

set_environment_variables("Simple LangChain test")

Now go ahead and run the langchain_basics.py file from part 1 again without changing anything about the code. LangSmith will now trace the execution of the code as we are using the updated set_environment_variables script.

After running the script, go to the LangSmith dashboard at https://smith.langchain.com/ and make sure you’re logged in. In your dashboard you will see the project name you set in the overview:

We can see that our Simple LangChain test project has been run a total of 2 times (1 run for each chain), with an error rate of 0%. We can see how many of the responses were streamed and how many tokens have been used in total for this project name.

Scrolling to the right reveals additional details:

We can see that our total cost for all runs on this project so far is $0.000237 and we have a latency of around 3 seconds per run. We also have the most recent run for reference. Go ahead and click the project for more details:

We have two entries, one for the french_german_chain and one for the check_answer_chain. When we use graphs later these will no longer be separate but combined into a single trace. Go ahead and click the lower one with and input of strawberries to see the details:

We can see the RunnableSequence which is the overall chain, and then the three sub-elements that we had in our chain, the ChatPromptTemplate, the LLM, and the StrOutputParser. On this page we see the input and output for the entire chain, and if you click on any of the steps like ChatOpenAI you will see the in- and output for that specific step:

Now our trace here is not that helpful as it is both very simple and broken up into two separate parts for each chain we ran, but this will be very helpful for easy feedback and debugging when we get to our graphs, which will combine complex systems into a single trace.

Tools – Image generator

Now let’s continue on and take a look at tools. If we want to have powerful multi AI-agent teams working away for us we need to be able to give them tools or functions to call. Naturally LangChain also comes with a handy integration for writing tools using a somewhat more pleasant syntax than the vanilla OpenAI tools.

We will be writing two tools, both of which we will use in our LangGraph graph in the next part. One of the tools will use Dall-e to generate an image (using our OpenAI key we already have) and download and save the image to disk. The other tool is going to get the current weather in a certain location. There are multiple ways in which tools can be defined in LangChain, but we will be using the latest convenient syntax here using the @tool decorator.

First let’s create a new folder called images and another one called tools in the root of our project, and then inside the tools folder create a new file named image.py:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images          โœจNew empty folder
    ๐Ÿ“‚ tools           โœจNew folder
        ๐Ÿ“„ image.py    โœจNew file
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py

In the image.py file we will define our first tool and see how this works. Let’s get started with our imports:

import uuid
from pathlib import Path

import requests
from decouple import config
from langchain.tools import tool
from openai import OpenAI
from pydantic import BaseModel, Field

As we will also download the image, we import uuid to create a unique filename so we don’t get clashes. We will use pathlib to define the path where we will save the image and requests to send an HTTP request to download the generated image from the internet.

We also import config from decouple to read our .env file, tool from langchain.tools to define our tool, OpenAI from openai to make a request to Dall-e, and BaseModel and Field from pydantic to define the input of our tool.

requests is already installed as a dependency of LangChain itself, and we already installed openai. Let’s make sure we install pydanctic as well by running:

pipenv install pydantic==1.10.13

Make sure you use this version as it plays nicely with the current LangChain versions. If you install V2 instead you will have to use different imports from mine.

As this is the only place where we will use the vanilla OpenAI client, we’ll just declare it here instead of integrating it into the setup_environment.py script. Add the following:

IMAGE_DIRECTORY = Path(__file__).parent.parent / "images"
CLIENT = OpenAI(api_key=str(config("OPENAI_API_KEY")))

To get a path to the images folder in the root of our project we first use Path(__file__) to get the path to the current file, then parent to go up one level to the tools folder, and then another parent to go up to the root of our project. We then add /images to get the path to the images folder.

We also create a CLIENT object using the OpenAI class and our API key from the .env file.

Image downloader

Let’s first create a helper function that takes an image URL and downloads and saves that image to our /images folder. This is not our tool but just a quick helper we can call from inside our tool later on. continuing in image.py add the following:

def image_downloader(image_url: str | None) -> str:
    if image_url is None:
        return "No image URL returned from API."
    response = requests.get(image_url)
    if response.status_code != 200:
        return "Could not download image from URL."
    unique_id: uuid.UUID = uuid.uuid4()
    image_path = IMAGE_DIRECTORY / f"{unique_id}.png"
    with open(image_path, "wb") as file:
        file.write(response.content)
    return str(image_path)

We define a function image_downloader that takes an image URL as input and returns a string with the path to the downloaded image. If the image URL is None we return a message saying that no image URL was returned from the API. We then use requests.get to download the image from the URL and check if the status code is 200 which means the request was successful, again sending a message if it was not successful.

We then create a unique ID using by instantiating a new UUID class object using uuid.uuid4(). We then create a path to the image using the IMAGE_DIRECTORY we defined earlier and the unique ID with a .png extension. Finally, we open the file in write binary mode (wb) and write the content of the response to the file, returning the path to the image as a string.

The reason we do not raise an error but send a string if the download fails is that an error will blow up our LLM application, but if we return a string instead the LLM agent will see that something went wrong and it can try to fix it or try calling the function again.

Input interface

Before defining our tool itself, we’re going to define the exact input interface that our tool will accept. Behind the scenes LangChain will use this to generate the JSON schema that the OpenAI API requires for function and tool calling. Add the following:

class GenerateImageInput(BaseModel):
    image_description: str = Field(
        description="A detailed description of the desired image."
    )

We use pydantic to define a GenerateImageInput class which inherits from BaseModel This will allow us to clearly define the input arguments our tool will need in order to run, as the LLM will need this information when calling a tool or deciding whether to call a tool or not.

We define a single field image_description which is a string and we use Field to add a description to the field. So we want an input argument of image_description which is a string that describes the image we want to generate. If you need multiple arguments you can define these here as well in the same fashion. For our uses, this one argument will do here.

Tool definition

Now it’s time to write our actual tool using the @tool decorator. Add the following:

@tool("generate_image", args_schema=GenerateImageInput)
def generate_image(image_description: str) -> str:
    """Generate an image based on a detailed description."""
    response = CLIENT.images.generate(
        model="dall-e-3",
        prompt=image_description,
        size="1024x1024",
        quality="standard",  # standard or hd
        n=1,
    )
    image_url = response.data[0].url
    return image_downloader(image_url)

We start with the @tool decorator which takes the name of the tool as the first argument and the schema of the input arguments as the second argument, passing in our GenerateImageInput class we defined earlier.

After that, we declare the function itself, which takes a string as input with the image description and will return an image path in string format. Note that we included a docstring that describes what the tool does: """Generate an image based on a detailed description.""".

This docstring is required when defining tools using the @tool decorator and is the description that will be used for the OpenAI tool schema generated behind the scenes that helps the LLM agent choose which function(s) to call. For this reason you must make sure it is an adequate description of what the tool does and what it’s purpose is.

After that we simply make a vanilla Dall-e image generation API request using CLIENT.images.generate with the model set to dall-e-3, the prompt set to the image_description we received as input, the size set to 1024x1024, the quality set to standard, and the number of images to generate set to 1. You can of course call on any image generation API you want, but as we already have an OpenAI key set we will use Dall-e here to keep things simple.

We then extract the URL by accessing response.data[0].url and return the result of calling the image_downloader function we defined earlier with the image URL as input. As the image_downloader function will save the image to file and return a path to it in stringform that fulfills our promise of having the generate_image function return a string file path to the image requested.

Test run

Tools are just functions except we clearly defined the input arguments, name, and the purpose of the function using a docstring. Now let’s give our tool a test run by adding the following to the bottom of the file:

if __name__ == "__main__":
    print(generate_image.run("A picture of sharks eating pizza in space."))

If this file is the main file being run, the generate_image function will be called for a quick test. If we import the tool from elsewhere this code block will not be triggered. Note that we call the run method on a tool in order to run it, this is part of the defined interface for LangChain tools.

So go ahead and run this file and you should see an image appear in the images folder in the root of your project, indicating that it worked. Make sure you didn’t forget to create the empty images folder in the root of your project.

My image here is pretty epic, I must say ๐Ÿฆˆ๐Ÿ•๐Ÿš€:

It is interesting to see that Dall-e choose peperoni pizza as a default pizza. Sorry if I made you hungry yet again ๐Ÿ˜…๐Ÿ•๐Ÿ•.

Weather tool

Ok with that settled, save and close up this file, and let’s move on to our second tool which will get the current weather in a certain location. We’ll go through this one quickly as the process is very similar to the first tool.

First, sign up for a free account at https://www.weatherapi.com/. They will give you pro for 14 days for free but it will automatically switch back to free afterward and you don’t have to provide any payment or credit card information, so don’t worry about it, the sign up will be pretty fast and totally free.

Signup and then get yourself an API key:
WeatherApi.com

Now add your new API key to your .env file:

OPENAI_API_KEY=your_api_key_here
LANGCHAIN_API_KEY=your_api_key_here
WEATHER_API_KEY=your_api_key_here

Save and close that and now lets create a new file in the tools folder called weather.py:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ tools
        ๐Ÿ“„ image.py
        ๐Ÿ“„ weather.py    โœจNew file
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py

In the weather.py file we will define our second tool. Let’s get started with our imports:

from json import dumps

import requests
from decouple import config
from langchain.tools import tool
from pydantic import BaseModel, Field

We import dumps from json too which will allow us to convert a dictionary to string format, as LLMs can only handle strings. The rest of the imports are familiar from the generate_image tool we made. Let’s define the input interface for our weather tool using a pydantic model:

class WeatherInput(BaseModel):
    location: str = Field(description="Must be a valid location in city format.")

This is the same as the other tool, again make sure the description is a good one as the LLM agent will make use of this. Let’s define our function that will call the weather API and return the response. Add the following:

@tool("get_weather", args_schema=WeatherInput)
def get_weather(location: str) -> str:
    """Get the current weather for a specified location."""
    if not location:
        return (
            "Please provide a location and call the get_current_weather_function again."
        )
    API_params = {
        "key": config("WEATHER_API_KEY"),
        "q": location,
        "aqi": "no",
        "alerts": "no",
    }
    response: requests.models.Response = requests.get(
        "http://api.weatherapi.com/v1/current.json", params=API_params
    )
    str_response: str = dumps(response.json())
    return str_response

We start with the @tool decorator with the name of the tool and the input schema as before. We then define the function itself which takes a string as input with the location and will return a string with the weather data. We include a docstring that describes what the tool does and is for so the LLM agent can make use of this.

If the location is not provided we return a message asking the LLM to provide a location and call the function again. We then define the API parameters as a dictionary with the API key which we read from the .env file using config, the location (q), and two optional parameters aqi (air quality index) and alerts set to no.

We then make a request to the weather API using requests.get with the URL http://api.weatherapi.com/v1/current.json and the API parameters. This will return a Response object from requests.models which we can convert to a dictionary using it’s .json() method. We then convert the dictionary to a string using the dumps (dump string) function we imported and return the string with the weather data.

Let’s add a quick test just like with the other tool:

if __name__ == "__main__":
    print(get_weather.run("New York"))

Now go ahead and give it a test run and you should see something like the following:

{"location": {"name": "New York", "region": "New York", "country": "United States of America", "lat": 40.71, "lon": -74.01, "tz_id": "America/New_York", "localtime_epoch": 1711278898, "localtime": "2024-03-24 7:14"}, "current": {"last_updated_epoch": 1711278000, "last_updated": "2024-03-24 07:00", "temp_c": -0.6, "temp_f": 30.9, "is_day": 1, "condition": {"text": "Sunny", "icon": "//cdn.weatherapi.com/weather/64x64/day/113.png", "code": 1000}, "wind_mph": 2.2, "wind_kph": 3.6, "wind_degree": 2, "wind_dir": "N", "pressure_mb": 1020.0, "pressure_in": 30.13, "precip_mm": 0.0, "precip_in": 0.0, "humidity": 49, "cloud": 0, "feelslike_c": -5.9, "feelslike_f": 21.5, "vis_km": 16.0, "vis_miles": 9.0, "uv": 2.0, "gust_mph": 15.8, "gust_kph": 25.4}}

Excellent! We now have some functions for our agents to play around with while we explore building more complex systems using graphs.

Simplifying tool imports

There is one quick thing left to do before we move on to the next part. The way our tools folder is set up right now we would have to import the tools from the tools folder in a kind of awkward way:

# Example, no need to copy - we will not use this code
from tools import weather, image

weather.get_weather("Alabama")
image.generate_image(
    "A T-rex made from kentucky fried chicken is attacking the white house."
)

This weather.get_weather is kind of awkward so let’s create a __init__.py file in the tools folder to make it easier to import the tools. Create a new file called __init__.py in the tools folder:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py    โœจNew file
        ๐Ÿ“„ image.py
        ๐Ÿ“„ weather.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py

In the __init__.py file add the following:

from .image import generate_image
from .weather import get_weather

This will import the generate_image and get_weather tools from their respective files and make them available when importing the tools folder. It has effectively made the tools folder a package that can be imported from as a single entity.

Now the above example can be changed to this:

# Example, no need to copy - we will not use this code
from tools import get_weather, generate_image

get_weather("Alabama")
generate_image("A T-rex made from kentucky fried chicken is attacking the white house.")

This is a lot more sensible. Save and close the __init__.py file and we are done with this part. In the next part, it is time to dive into LangGraph and start building some more complex systems using agents and tool calls to interlink them into a graph that can do some cool stuff. See you there!

P.S. I know you are secretly curious what the T-rex made from KFC attacking the white house looks like ๐Ÿ˜…๐Ÿ—๐Ÿฆ–๐Ÿ›๏ธ. Here is is:

Kentucky Fried T-rex, anyone?

LangGraph Introduction

Hello and welcome back to part 3 of this tutorial series. In this part, we’ll be getting started with LangGraph. Instead of having a lot of explanation before we start, we’ll see how stuff works as we go along. So without further ado, let’s just jump right in.

Let’s start by actually installing LangGraph, as it doesn’t get installed by default with LangChain. To install LangGraph, you can use the following command in your terminal:

pipenv install langgraph==0.0.30 langchainhub==0.1.15

Once you’ve installed LangGraph, let’s start by creating a new file called simple_langgraph.py:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ tools
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py    โœจNew file

Over the next three parts, we’ll be looking at different ways in which you can use LangGraph to chain LLMs and tools together. In this first part we’ll be looking at a simple classic LLM –> goes to a tool executor –> and then back to LLM type setup.

Open up simple_langgraph.py and let’s start by importing the necessary modules:

import operator
from typing import Annotated, TypedDict, Union

from colorama import Fore, Style
from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain_core.agents import AgentAction, AgentActionMessageLog, AgentFinish
from langchain_core.messages import BaseMessage
from langchain_core.runnables.base import Runnable
from langchain_openai.chat_models import ChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_executor import ToolExecutor

from setup_environment import set_environment_variables
from tools import generate_image, get_weather

That is a lot of stuff! Don’t worry, most of it is actually not as complex as it seems. Usually, I’ll go over all the imports before we get started, but as there are quite a few to go through, I’ll cover each import when we get to the part where it’s used instead. For now, just have them copied.

Next, we’ll set the environment variables and define a couple of constants:

set_environment_variables("LangGraph Basics")

LLM = ChatOpenAI(model="gpt-3.5-turbo-0125", streaming=True)
TOOLS = [get_weather, generate_image]
PROMPT = hub.pull("hwchase17/openai-functions-agent")

We reused our set_environment_variables function from the previous part to set the environment variables and set the name for the LangSmith traces to LangGraph Basics. We then define our LLM just like we did in part 1, also setting the streaming parameter to True. We then define a list of tools which is literally just a list containing the two tools that we wrote.

The LangChain Hub

For the prompt template, we pull it from the LangChain Hub this time, mostly because I want to show you that it exists! The LangChain Hub is kind of like a mini-GitHub for storing LangChain ChatPromptTemplates just like the simple ones we wrote in part 1. You can push new commits to your templates and pull them like we just did here, kind of like GitHub.

You can go to https://smith.langchain.com/ and scroll down to find the Hub button:

Click it to visually browse the prompts available on the hub:

You can use this as a convenient place to store your prompts. You can also set them to private if you don’t want to share them with the world and you can even fork other public prompts that you like to your own repositories. It’s a handy tool for development. For production or highly sensitive company data, you might want to store your prompts in a more secure location.

If we look up the prompt we just pulled, we can see that it is a fairly simple prompt:

It has an extremely basic system message of "You are a helpful assistant" and we can see that it has placeholders for chat_history, human input and an agent_scratchpad. The chat_history and input are kind of self-explanatory in that they hold the chat history so far and the human input, but what about this agent_scratchpad?

The agent_scratchpad is kind of like a place where the agent can take notes while going through its reasoning process of what action should be taken next and what functions should be called. Think of it as a notepad where the LLM can jot down its thoughts. Think of it kind of like the following:

user:
    "Can you recommend me a zombie game from the year 2022?"

    > Entering new AgentExecutor chain...
    Thought: Oh, I love zombie games! There are so many great ones out there. Let me think about the best zombie game from 2022.
    Action: use_search_engine
    Action Input: "best zombie game 2022"

    Observation:[{list of search result objects for query "best zombie game 2022"}]
    There are three great zombie games from 2022 that I found: Zombie Cure Lab, Zombie Survivors, and SurrounDead. Let me think about which one to recommend.
    Action: use_search_engine
    Action Input: "Zombie Cure Lab"

    Observation:[{list of search result objects for query "Zombie Cure Lab"}]
    Zombie Cure Lab is a game where you manage a lab and try to cure the zombie virus. (Bunch more info here yadayada...) I recommend Zombie Cure Lab as the best zombie game from 2022.

    Final Answer: The best zombie game from 2022 is Zombie Cure Lab.

This is just a conceptual example here to describe the idea, but the agent takes reasoning steps and makes observations along the way, first deciding to call a search engine tool to better answer the user question, then deciding to call the search engine tool to get more information on one of the games in particular, and then finally deciding that it has enough information to answer the user question.

So the agent_scratchpad is used to store these intermediate observations on what action to take next, but also to decide when the agent is done, so that it doesn’t just keep looping indefinitely. We’ll get back to how we can see when the agent is done in a moment.

The State Object

Ok, we have an LLM, some tools, and a prompt template. The next thing we need is a state object to keep track of the state for each step along our graph. So a LangGraph is kind of like a state machine, and it is going to take this state object and pass it along each node of the graph. Let’s look at a simplified example:

# Simplified example
StateObject():
    user_input = "please do a for me"
    chat_history = [list of previous chat messages for context...]
    am_i_done = False
    steps_taken = []

So say we have this state object above. We have received the user input question, and whatever chat history has come before if we have decided to implement memory. We have a flag am_i_done which is obviously set to False at the start, and we have a list of steps_taken which is empty at the start. Now we hand this state object to node A in our graph ->

# Simplified example Node A
StateObject():
    user_input = "please do a for me"
    chat_history = [list of previous chat messages for context...]
    am_i_done = False
    steps_taken = ["action_a was taken"]

It does some action we will just call action_a, which has taken it a step closer to answering the user question but it is not quite done yet so the am_i_done flag is still set to false. Now node A passes this state object to node B in our graph ->

# Simplified example Node B
StateObject():
    user_input = "please do a for me"
    chat_history = [list of previous chat messages for context...]
    am_i_done = True
    steps_taken = ["action_a was taken", "action_b was taken"]

This node does some action_b stuff and now has the final answer it needs to give to the user. It sets the am_i_done flag to True because it is done. We can use this am_i_done flag to test if the graph is completed yet (e.g. the user question or request has been fully answered).

So as the graph traverses over the nodes we define, each node will receive the state object, update it where needed, and then pass it along to the next node, or perhaps back to the previous one, or sideways to node D if a certain condition is met. So let’s define the real state object that we will be using:

class AgentState(TypedDict):
    input: str
    chat_history: list[BaseMessage]
    agent_outcome: Union[AgentAction, AgentFinish, None]
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

We use a TypedDict to define a specific dictionary structure, defining the keys that this dictionary will have and the types of values that will be stored for each of those keys. The first entry is simply the user input, which is a str string value.

The second entry is the chat history, which is a list of BaseMessage objects. A BaseMessage object is just any one of the lines of this object below where you have a message and the originator of the message like “system”, “human”, or “ai”:

# Example BaseMessages
("system", "You are a helpful AI bot. Your name is {name}."),
("human", "Hello, how are you doing?"),
("ai", "I'm doing well, thanks!"),
("human", "{user_input}"),

The third item in the state object will be agent_outcome. The agent here will do its thing and then either return an AgentAction object or an AgentFinish object to us.

  • AgentAction: An AgentAction object simply contains the name of the tool the agent wants to call and the input arguments for that tool call, maybe like get_weather and {"location": "New York"}.
  • AgentFinish: An AgentFinish object simply means that the agent considers its task finished and holds the final return_values inside.

Using this agent_outcome object we can see what the next step is or if it is done.

The fourth and last entry in the AgentState object is a bit easier to read from the inside. We have a list of tuples where each tuple contains an AgentAction object and a str string. The AgentAction here is the same object that we described in the step above, containing a tool to be called and its input arguments. The difference here is that the step is already taken and the string which is the second item in the tuple is the tool output after it was called. So something like this:

## Fictional example object
[
    (
        AgentAction(tool="get_weather", input={"location": "New York"}),
        "{API response JSON object...}",
    ),
    (
        AgentAction(tool="generate_image", input={"image_description": "cat"}),
        "Path/to/image.png",
    ),
]

The Annotated type hint is used to add metadata to the type hint. In this case, we are using the operator.add function to tell the type checker that this list will be added to, so we are describing the AgentState object’s intermediate_steps list as a list that will be added to, like the example above.

The Agent

Now that we have our state object defined, we will define our agent that will have access to both the generate_image and get_weather tools:

runnable_agent: Runnable = create_openai_functions_agent(LLM, TOOLS, PROMPT)

We use the create_openai_functions_agent function we imported from LangChain to create an agent that has access to the LLM, the tools, and the prompt we defined so far. LangChain will make this into an OpenAI compatible agent by combining them for us into a Runnable type object. We have seen this Runnable object before in part 1 in the form of our chains. All Runnable type objects have the invoke, stream, and batch methods just like the chains we used in part 1.

Before we move on with the nodes and graph let’s test the agent we have so far. We’ll manually create a quick input here (as we haven’t built our graph yet) and then call invoke on the agent:

inputs = {
    "input": "give me the weather for New York please.",
    "chat_history": [],
    "intermediate_steps": [],
}

agent_outcome = runnable_agent.invoke(inputs)
print(agent_outcome)

Now go ahead and run this to test the agent so far and you should see something like this:

API Keys loaded and tracing set with project name:  LangGraph Basics
tool='get_weather' tool_input={'location': 'New York'} log="\nInvoking: `get_weather` with `{'location': 'New York'}`\n\n\n" message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"location":"New York"}', 'name': 'get_weather'}}, response_metadata={'finish_reason': 'function_call'})]

We can see the agent wants to call the get_weather tool with the input {"location": "New York"}, so it’s asking us to call this function with these input arguments. Of course, it stopped running there as we haven’t linked up any other nodes yet, but we know that the agent is working so far.

Go ahead and remove the test inputs and agent_outcome code. Just for clarity, here is what you should have so far:

import operator
from typing import Annotated, TypedDict, Union

from colorama import Fore, Style
from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain_core.agents import AgentAction, AgentActionMessageLog, AgentFinish
from langchain_core.messages import BaseMessage
from langchain_core.runnables.base import Runnable
from langchain_openai.chat_models import ChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_executor import ToolExecutor

from setup_environment import set_environment_variables
from tools import generate_image, get_weather


set_environment_variables("LangGraph Basics")

LLM = ChatOpenAI(model="gpt-3.5-turbo-0125", streaming=True)
TOOLS = [get_weather, generate_image]
PROMPT = hub.pull("hwchase17/openai-functions-agent")


class AgentState(TypedDict):
    input: str
    chat_history: list[BaseMessage]
    agent_outcome: Union[AgentAction, AgentFinish, None]
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]


runnable_agent: Runnable = create_openai_functions_agent(LLM, TOOLS, PROMPT)

The Nodes

So now the first thing we need to do is to create some nodes here so we can string them together into a graph. Let’s start with the Agent Node:

def agent_node(input: AgentState):
    agent_outcome: AgentActionMessageLog = runnable_agent.invoke(input)
    return {"agent_outcome": agent_outcome}

We define the node as a simple function that takes input which will be the AgentState object for all nodes. It then calls the invoke method on the agent with the input and catches the return in a variable named agent_outcome which is of type AgentActionMessageLog. This agent_outcome will have either the AgentAction object or the AgentFinish object that we talked about earlier, indicating what the next step is or if the agent is done. Whatever is in the agent_outcome, this function simply returns it in a dictionary.

Now that we have an agent node we need another node to execute the tools that the agent wants to call. Let’s define the Tool Executor Node:

tool_executor = ToolExecutor(TOOLS)

def tool_executor_node(input: AgentState):
    agent_action = input["agent_outcome"]
    output = tool_executor.invoke(agent_action)
    print(f"Executed {agent_action} with output: {output}")
    return {"intermediate_steps": [(agent_action, output)]}

First, we create a new instance of the ToolExecutor class that we imported from LangGraph. This ToolExecutor is initialized by giving it our list of tools which includes two tools in this case. The ToolExecutor provides a prebuilt interface that will extract the function and arguments the agent wants to call from the AgentAction object and then call the function with the arguments so we don’t have to do this manually.

Then we define the tool_executor_node function which again is just a simple function with input (which will be the state object). We extract the agent_action from the input dictionary and then call the invoke method on the tool_executor object which will run whatever tool the agent wants to call for us.

We have a print statement just for our own visual feedback here, and then we return the intermediate_steps list with the agent_action and the output of the tool call. Notice that this is the intermediate steps list that we defined in the AgentState object and talked about earlier and will be added to whatever steps were already there.

Now that we have these two functions for the nodes, we need a way to test if we want to finish the graph because the Agent Node has arrived at the final answer or if we need to continue on to the Executor node because it needs to execute a tool call. We can do this by defining a function that will check if the agent is done:

def continue_or_end_test(data: AgentState):
    if isinstance(data["agent_outcome"], AgentFinish):
        return "END"
    else:
        return "continue"

This function takes the AgentState object as input. Then it simply indexes into the agent_outcome. We said earlier that the agent_outcome will either be an AgentAction object (if still working) or an AgentFinish object if the agent is done. So if the agent_outcome is an instance of AgentFinish we return "END" to signal that the graph is done, otherwise, we return "continue" to signal that the graph should continue.

Creating our Graph

Now that we have two nodes and a test to see if we need to continue (this is just a very simple first example to explain the concepts), we can define our graph. The main type of graph in LangGraph is called a StatefulGraph, which passes a state object around as we discussed. Each node then returns some kind of update to that state, either setting specific attributes or adding to the existing attribute like the intermediate_steps list.

Setting up our graph is easy:

workflow = StateGraph(AgentState)

workflow.add_node("agent", agent_node)
workflow.add_node("tool_executor", tool_executor_node)

workflow.set_entry_point("agent")

First, we instantiate a new StateGraph passing in our AgentState object that we defined. We then simply add our two nodes, giving them a string name and passing in the functions we wrote second. Lastly, we set the entry point to the agent node, which is the first node that will be called when we start the graph.

Now we have a graph with an entry point. The next step is to define the connections called edges between the nodes. This is also very easy:

workflow.add_edge("tool_executor", "agent")

workflow.add_conditional_edges(
    "agent", continue_or_end_test, {"continue": "tool_executor", "END": END}
)

First, we add an edge from the tool_executor node back to the agent node. After we execute a tool call, we always want to feed the result back into the agent node.

Then we add a conditional edge from the agent node. We pass in our continue_or_end_test function that will determine where this edge will lead. If the function returns "continue" we will go to the tool_executor node, and if it returns "END" we will go to the END node. The END node is a special pre-built node that was part of our imports when we started this file.

Our simple graph in visual form now looks like this:

Now that we have our graph defined, we need to take the final step which is to compile the graph before we can use it:

weather_app = workflow.compile()

Testing our Graph

Now let’s whip up a quick function to test our graph:

def call_weather_app(query: str):
    inputs = {"input": query, "chat_history": []}
    output = weather_app.invoke(inputs)
    result = output.get("agent_outcome").return_values["output"]  # type: ignore
    steps = output.get("intermediate_steps")

    print(f"{Fore.BLUE}Result: {result}{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}Steps: {steps}{Style.RESET_ALL}")

    return result

The function will take a string query. As input, we need to define the input key with the query and an empty chat_history list as we don’t have a previous history for now. We then call invoke on the weather_app graph object and catch the output in a variable named output. The agent_outcome will have an AgentFinish which has the return_values attribute that holds the final answer as we discussed.

# type: ignore is just for the type checker here as it doesn’t know that agent_outcome will always be an AgentFinish object and I don’t want to go too far into type hinting in this tutorial. If you don’t use type checking you won’t need the comment. We also extract the intermediate_steps list from the output into a variable named steps.

When we started the file we imported Fore and Style from the colorama library. This library has already been installed as a dependency of something else, so we didn’t have to install it. The Fore.BLUE sets the text foreground color to blue and the Style.RESET_ALL resets the color back to the default, repeating the pattern on the next line with yellow for easy readability.

Now we can test our graph by calling the function with a query:

call_weather_app("What is the weather in New York?")

Go ahead and run this and you should see the final answer in blue:

Result: The current weather in New York is sunny with a temperature of 35.1ยฐF (1.7ยฐC). The wind is coming from the north at 11.2 km/h. The humidity is at 52%, and
the visibility is 16.0 km.
Steps: All the steps here in yellow...

Good! That worked. The steps are a bit hard to read, but that is what we have LangSmith for. Head over to https://smith.langchain.com/ and check out your trace under the project name of LangGraph Basics. Take the one named LangGraph as the RunnableSequence one is from when we did the partial test before we built our graph:

We can see that the graph started with our agent, then went to the tool_executor, back to the agent, and then ended. Click on any of the steps to see more detail. Nice and readable right?

Something a bit cooler!

So let’s give our simple graph test here a bit of a bigger challenge! Comment out the old query and let’s ask something a bit harder:

# call_weather_app("What is the weather in New York?")

call_weather_app("Give me a visual image displaying the current weather in Seoul, South Korea.")

Let’s run this and see what we get (it should auto-save an image in the project’s images folder):

Result: Here is the visual image displaying the current weather in Seoul, South Korea:

![Seoul, South Korea Weather](c:\Coding_Vault\FINX_LANGGRAPH\images\152cf0e0-c50e-483b-be63-50ef40ea3255.png)

That’s pretty good! It has the temperature and the rain. I can confirm that it is currently dark and rainy over here and this also corresponds to the weather data the API sent back. Pretty dang cool right!?

If we look at the LangSmith trace we’ll see exactly what we expect:

The agent calls the weather function, it comes back to the agent which calls the image function, and then it ends by giving us the image. I’ll leave you to click on any of the steps if you want to see the in and outputs at each step.

Of course, we can put this information of wanting a visual image into the prompt so the user doesn’t have to type it and improve on this in many ways like directly displaying the image to the end user but that is not the point here, this is just a simple demonstration of how the edges and nodes come together to create a simple graph.

In the next part we’ll take this up a step. Where we basically have a single agent now, we’ll look at having a whole team of agents working together! I’ll see you in the next part!

P.S. I generated another one just for fun and it’s pretty good:

Multi-Agent LangGraph Teams Preparation

Hi and welcome back to part 4 of this tutorial series where we’ll once again be taking it up a step. We’ll basically compress the Agent and the Executor into a single node and then have multiple of these ‘agent and executor’ nodes inside of a team working together. First, we’ll cover the basic idea and do some short work to prepare the extra functions we will need, and then we’ll continue into the next part where we’ll put it all together into a multi-agent team that does the work for us while we sit back and relax!

Advantages of multi-agent teams

So why is this multi-agent thing useful in the first place? We can simply give one agent multiple tools right? Well, up to a point. If you give a single agent a prompt to first do thing A by calling function_a and then do thing B by calling function_b followed by either function_c or function_d depending on the output of function_b then the prompt of this agent is going to become a mess and it will also be fairly unreliable. The main advantages of multi-agent teams for more complex setups are:

  • Grouping responsibilities gives better results as agents will tend to perform better when they have a more focused task rather than a dozen tools and responsibilities to choose from.
  • Separate prompts will give better results as each prompt can have its own examples of exactly what we want it to do and how. We can even have a specific agent run on a fine-tuned version of ChatGPT that is specifically trained and optimized for that node’s task.
  • Easier development as you can work on, test, and evaluate each agent in insolation without it being connected to and breaking stuff elsewhere in the chain when you make improvements. It’s also easier to conceptually wrap your brain around the system as a whole.

There are many possible slight variations for how this could be implemented. You could have a shared scratchpad for example so that all of the agents can see what thought processes and work the other agents have done. The downside is that this is very verbose though and the amount of information exchanged may be pointlessly large.

Alternatively, you could have them be isolated as single LLM calls without a strong interconnection that basically operate independently but they are merely strung together in a chain. This may be a bit too isolated though.

The example we’ll be looking at here lies somewhere in the middle where we will have independent fully-fledged agents that have their own scratchpad and ability to call tools if needed but the result of each agent doing its independent work gets stored in a shared state object like we had in the previous part.

This will be supervised by a sort of ‘team supervisor’ node we’ll call an ‘agent supervisor’ that will use this overall state object with the work done so far to decide what happens next and who to call. The basic idea looks like this:

The user sends a query to the Team Supervisor. The Team Supervisor then has a team of agents and it decides who it should call on next to complete some work, it can choose any of the agents at any point. Every agent points back to the Team Supervisor so that the Team Supervisor gets to decide again after each step which agent is next or if the work has been completed, in which case it will return to the end user.

Ours will look slightly different but we’ll build a diagram for it as we go along.

Tavily API

Before we jump in we’ll need to add another API key to our .env and setup_environment.py files. We will be using the Tavily API lightly during this part and again in the next part of the series. Go to https://app.tavily.com/ and sign up for a free API key.

Tavily is a search engine optimized for AI agents and we can use it to have an agent search the internet. One of the reasons I chose Tavily here is that LangChain comes with pre-built tools for Tavily that we can just import and use as is, allowing us to focus more on learning about LangGraph as we have one less tool to write. You can just use your Google account for quick and easy sign up and it will cost you nothing for the first 1000 or so queries which is way more than we’ll use. Get your API key and copy it to the clipboard. Then open your .env file and add it like so:

OPENAI_API_KEY=your_api_key_here
LANGCHAIN_API_KEY=your_api_key_here
WEATHER_API_KEY=your_api_key_here
TAVILY_API_KEY=your_api_key_here

Make sure not to use any spaces or quotation marks as usual. Then go ahead and save and close the .env file. Now open the setup_environment.py file and add a single tine to load the TAVILY_API_KEY to an environment variable like so:

import os
from datetime import date

from decouple import config


def set_environment_variables(project_name: str = "") -> None:
    if not project_name:
        project_name = f"Test_{date.today()}"

    os.environ["OPENAI_API_KEY"] = str(config("OPENAI_API_KEY"))

    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_API_KEY"] = str(config("LANGCHAIN_API_KEY"))
    os.environ["LANGCHAIN_PROJECT"] = project_name

    ##### Add only this line #####
    os.environ["TAVILY_API_KEY"] = str(config("TAVILY_API_KEY"))
    ##############################

    print("API Keys loaded and tracing set with project name: ", project_name)

Now save and close the setup_environment.py file.

Prep for our multi-agent team

For this example over the next two parts, we will be creating a multi-agent team that will generate travel itineraries for us in PDF format, with us simply inputting a query and getting a fully formed PDF travel itinerary out the other end including an image. We will have three different tools that we will need for the overall setup:

  1. An image generator: We already made one in the last part, so we can just import and reuse it, which is one of the nice things about LangChain tools.
  2. An internet search tool: In case the agent wants to search for more information. LangChain comes with some pre-built tools one of which is for Tavily Search, which is why we got the API key. We can just use this prebuilt here to save some time.
  3. A PDF generator: We will need a tool for our agents to be able to write PDF files and save them to disk. We will have to write this one ourselves before we can get started on our travel itinerary multi-agent team setup.

PDF writing tool

So let’s write up a quick PDF writing tool for our agents before we move on. Inside your tools folder make a new file named pdf.py:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py    โœจNew file
        ๐Ÿ“„ weather.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py

Inside this new pdf.py file get started with our imports:

import os
import uuid
from pathlib import Path

import pdfkit
from langchain.tools import tool
from markdown import markdown
from pydantic import BaseModel, Field

We import os to work with the operating system, uuid to generate unique filenames again, and Path to create a path towards an output folder to save our PDF files. The tool decorator from LangChain is the same one that we used last time and the Basemodel and Field from pydantic imports are for defining the input arguments interface for our function just like we did before.

The pdfkit library is going to let us save HTML to real output PDF files, but the downside is that it needs HTML as input to do the conversion. As HTML is more complex for our LLM agents to write which introduces more variables and I want to keep this example simple we will be using the markdown library to convert markdown to HTML for us. That way we can just tell our agents to write in markdown formatting (which is very simple) and our function will do markdown -> HTML -> PDF.

Both pdfkit and markdown are not installed by default so we will have to install them in our virtual environment. Open your terminal and run:

pipenv install markdown==3.6 pdfkit==1.0.0

That will take care of the basic Python library installs, but pdfkit needs an additional step, as it actually uses something called wkhtmltopdf under the hood to achieve the conversion. Head over to https://wkhtmltopdf.org/downloads.html and click the appropriate download for your platform. I am on Windows so I’ll select the Windows 64-bit download option:

Run the installer and select an install location. I’ll simply use the default C:\Program Files\wkhtmltopdf myself. Whichever install location you choose, take note of it and copy it somewhere as you will need it in a moment:

Let that run the install and when it’s done we can get back to the code! Below our imports in pdf.py we’ll add some quick setup:

PATH_WKHTMLTOPDF = r"C:\Program Files\wkhtmltopdf\bin\wkhtmltopdf.exe"
PDFKIT_CONFIG = pdfkit.configuration(wkhtmltopdf=PATH_WKHTMLTOPDF)

OUTPUT_DIRECTORY = Path(__file__).parent.parent / "output"

First of all, we do some setup for pdfkit by pointing it to the location of the wkhtmltopdf executable. This is the path I used on my Windows machine, you have to adjust this path to where you installed wkhtmltopdf on your machine so be sure that you use the correct path for you! After defining the path we can simply call pdfkit.configuration with the wkhtmltopdf argument set to the path we just defined. Later in the code when we actually write the PDF files, we can pass in this PDFKIT_CONFIG as an argument to use this configuration.

We then use the same trick as last time to get a path to a folder named output in our project root. This is where we will save our PDF files, but the folder doesn’t exist yet. Make sure you create it right now or the code will fail when it tries to save the PDF files later and you’ll be stuck debugging why it doesn’t work:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ output    โœจNew empty folder
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py
        ๐Ÿ“„ weather.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py

Good! Now back to our pdf.py file. Below the setup we’ll define our input arguments interface just like we did with our other tools so far:

class MarkdownToPDFInput(BaseModel):
    markdown_text: str = Field(
        description="Markdown text to convert to PDF, provided in valid markdown format."
    )

We simply define the input arguments as a single string that has to be in a valid markdown format. Once again make sure your description is a good one as the LLM will use it, it is not just for our own reference.

HTML generation

Let’s make the problem we need to solve smaller by first writing a separate function to generate the HTML from the markdown text so we can just feed HTML into pdfkit:

def generate_html_text(markdown_text: str) -> str:
    """Convert markdown text to HTML text."""
    markdown_text = markdown_text.replace("file:///", "").replace("file://", "")
    html_text = markdown(markdown_text)
    html_text = f"""
    <html>
    <head>
        <style>
            @import url('https://fonts.googleapis.com/css2?family=Roboto&display=swap');
            body {{
                font-family: 'Roboto', sans-serif;
                line-height: 150%;
            }}
        </style>
    </head>
    <body>
    {html_text}
    </body>
    </html>
    """
    return html_text

This function takes a markdown_text as string input. First, we’ll search the markdown text for any file:/// or file:// protocol declarations sometimes used when the model inserts our image in markdown. These are not needed so we simply replace them with an empty string "" as these would cause our image to not show up in the final generated PDF file. This kind of thing is something you just discover during your development work.

Now we can simply call the markdown function we imported on our markdown to get valid HTML based on the markdown. As I felt like doing some light styling I then wrapped the html_text in some basic HTML tags html, head, and body. In the head we can then include a style tag which allows us to load the Roboto font from Google using the css function @import url, set it as the font, and give some extra line height to our document to make the text more readable. This is the final html_text that will be returned with the markdown call converted HTML in the body portion. If you happen to be less familiar with HTML just copy what I have, it’s not really important for the course.

Finishing up the tool

Now it’s time to define the actual tool itself. Continue below:

@tool("markdown_to_pdf_file", args_schema=MarkdownToPDFInput)
def markdown_to_pdf_file(markdown_text: str) -> str:
    """Convert markdown text to a PDF file. Takes valid markdown as a string as input and will return a string file-path to the generated PDF."""
    html_text = generate_html_text(markdown_text)
    unique_id: uuid.UUID = uuid.uuid4()
    pdf_path = OUTPUT_DIRECTORY / f"{unique_id}.pdf"

    options = {
        "no-stop-slow-scripts": True,
        "print-media-type": True,
        "encoding": "UTF-8",
        "enable-local-file-access": "",
    }

    pdfkit.from_string(
        html_text, str(pdf_path), configuration=PDFKIT_CONFIG, options=options
    )

    if os.path.exists(pdf_path):
        return str(pdf_path)
    else:
        return "Could not generate PDF, please check your input and try again."

We start with the @tool decorator, once again providing a string name for our function and then the input argument interface we defined. The function itself takes a markdown_text as input and returns a string file path to the generated PDF file. We have a docstring that explains what the function does and what it expects as input as the LLM is going to use this.

We then call our generate_html_text function on the markdown_text to get the html_text we need and generate a unique ID for the PDF file name, creating a path to the PDF file in our OUTPUT_DIRECTORY folder. We then define some options for pdfkit to use when generating the PDF. These are just some basic options that I found to work ok for our example, we don’t want to get sidetracked here by spending too much time on this as it is not the focus of this tutorial.

Finally, we call pdfkit.from_string with the html_text, the path to the PDF file in str format instead of a Path object, the configuration we set up atop this file, and the options we just defined. If the PDF file is successfully generated, which we can check with the os.path.exists function to see if the file exists or not, we return the path to the PDF file. If it does not exist we return a message saying that the PDF could not be generated. We purposely do not raise an error but send a string response as the agent can receive this, try to find the error, fix it, and try again.

PDF tool test run

Now let’s add a quick test at the bottom of our file:

markdown_dummy_text = """
# Title
This is a test of the markdown to PDF function.
## Subtitle
This is a test of the markdown to PDF function.
### Sub-subtitle
This is a test of the markdown to PDF function. This is a paragraph with random text in it nunc nunc tincidunt nunc, nec.
S'il vous plaรฎt.
"""

if __name__ == "__main__":
    print(markdown_to_pdf_file(markdown_dummy_text))

There are a couple of headings here and some French with non-standard characters like in “plaรฎt” to make sure it also works with special characters. Now go ahead and run your file (Reminder: make sure you created the output folder!). Close the printer message popup if you get one, we’ll just ignore it for now. You should see a new PDF file in your output folder. Go ahead and open it:

It’s not perfect by any means, but it works well enough for our LangGraph example purposes. As LangGraph is the focus here we will not spend any more time perfecting the details of this particular tool.

One last step though to fix the imports. Open up the tools/__init__.py file and fix the code to:

from .image import generate_image
from .weather import get_weather
from .pdf import markdown_to_pdf_file

Save and close that so we can have the nicer imports in our main code. That’s it for the preparation, this part is slightly shorter by design as the next one will be extra long. It is finally time to set up and run our multi-agent team! So let’s get to the fun stuff, I’ll see you there! ๐Ÿš€

Multi-Agent LangGraph: Setting Up Our Multi-Agent Team

Welcome back to part 5, where we’ll set up our multi-agent team. So buckle up and let’s jump right in. Create a new file named multi_agent.py in your project root:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ output
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py
        ๐Ÿ“„ weather.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ multi_agent.py    โœจNew file
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py

Open up the multi_agent.py file and start with the imports:

import functools
import operator
from typing import Annotated, Sequence, TypedDict

from colorama import Fore, Style
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph

from setup_environment import set_environment_variables
from tools import generate_image, markdown_to_pdf_file

We have a lot of imports again, many of which will be familiar. We import our own two tools from the tools folder and also the TavilySearchResults from the langchain_community tools. There are some new imports like functools and the AgentExecutor but we’ll cover each one and how they are used as we go along.

Environment variables and constants

Let’s load up our environment variables and create a bunch of constants we’ll need:

set_environment_variables("Multi_Agent_Team")

TRAVEL_AGENT_NAME = "travel_agent"
LANGUAGE_ASSISTANT_NAME = "language_assistant"
VISUALIZER_NAME = "visualizer"
DESIGNER_NAME = "designer"

TEAM_SUPERVISOR_NAME = "team_supervisor"
MEMBERS = [TRAVEL_AGENT_NAME, LANGUAGE_ASSISTANT_NAME, VISUALIZER_NAME]
OPTIONS = ["FINISH"] + MEMBERS

We load our environment variables and set the project name to Multi_Agent_Team. We then define a bunch of constants for the names of our agents and the team supervisor. These are just strings but as we’ll have to type each of these strings multiple times it will be very annoying if we change or mistype one, hence storing these in a single place up top is the way to go.

Note that we have the travel_agent, language_assistant, and visualizer inside a list called members and we have the designer and team_supervisor on the outside. We also imported the END node we used last time. That leaves us with a situation like this:

The list named OPTIONS is going to be the potential options the team_supervisor can choose from each step along the way, so it has all three members in the team + the "FINISH" option to indicate this particular team has finished its work.

Add two more final constants below:

TAVILY_TOOL = TavilySearchResults()
LLM = ChatOpenAI(model="gpt-3.5-turbo-0125")

We have the TAVILY_TOOL which is the Tavily search tool we imported from the langchain_community tools and the LLM which is gpt-3.5-turbo-0125 here but feel free to use GPT-4-turbo instead if you want.

Agent creator function

We’re going to be creating a lot of agents here, so let’s create a function to handle the repetitive work of creating an agent:

def create_agent(llm: BaseChatModel, tools: list, system_prompt: str):
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt_template)
    agent_executor = AgentExecutor(agent=agent, tools=tools)  # type: ignore
    return agent_executor

We define a function named create_agent which takes an llm of the type BaseChatModel. This is just a type hint but it was part of our imports for clarity. BaseChatModel is the base class for all chat models in LangChain, including the ChatOpenAI variation we use here. You can pass any LLM you want and have different nodes of the same graph run on completely different LLMs. The other arguments are a list of tools and a system_prompt string.

We then declare a prompt_template using the ChatPromptTemplate.from_messages method that we used all the way back in part 1, but this time we use multiple messages. We have a "system" message that is the system prompt string passed into the function and then we have two placeholders for the messages and agent_scratchpad variables that we have seen before. The MessagesPlaceholder, as the name suggests, is just a placeholder for both of these so we can insert them later using the names we have defined under variable_name.

We then use the create_openai_tools_agent just like we did in part 3, but this time we go one step further and create an AgentExecutor in the step below. This AgentExecutor comes with LangChain and will basically combine the agent and the executor nodes we had in the previous part into a single node, handling the function call logic we did in the previous part for us! It takes an agent and a list of tools for that agent to use as arguments.

The # type: ignore comment is in case you use a type checker as it will complain here, and this series is not about type checking so we won’t go too deep into it as it’s no big deal here. We then return the agent_executor we created.

Agent state object

Now let’s declare the state object that we will be passing around in this particular graph:

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next: str

This time we need two entries. The first is the messages which is a sequence of BaseMessage objects which again are just messages like ("human", "Hello, how are you doing?"), or ("ai", "I'm doing well, thanks!"),. We define it as a Sequence, so like a list or a tuple of these messages, and the operator.add again indicates that we will add to this sequence of messages with each step. Annotated is just used as it allows us to add the annotation of operator.add.

The second entry is the next which is a string that will be the name of the next agent to call. This is the agent that the team_supervisor will decide to call next based on the state object it receives and then we can use this field to see which agent to route to next. This field can just be overwritten as we don’t need the history, so a single string without any fancy annotations will do fine here.

Agent node function

Now let’s define a function that represents one of these agent nodes in general:

def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

The function takes the state object, an agent, and the string name for the agent (the ones we defined up top as constants). Then we simply need to invoke the agent with the state and then keeping with the promise we made above in the AgentState object we defined the node needs to return a messages object with a message in it. We will simply use a HumanMessage, as it doesn’t really matter who the message comes from, and get the result from result["output"] which is the output of the agent’s call.

Team supervisor’s next member choice

Next, we’re going to need a way to have the team_supervisor choose which agent to invoke next. The easiest way to do this reliably is to pretend this is a function that the agent supervisor has to call for us. The only possible input arguments are the names of our agents and we tell the team_supervisor that it must call nonexistent_function(agent_name) to invoke the agent.

This is a bit of a hack, but it makes it very easy for us to extract the agent_name consistently and easily to see which agent node needs to run next. We will also include one extra option of “FINISH” so the team_supervisor can tell us when it’s done and needs to break out of the team. Doing this will also let us use the JsonOutputFunctionsParser later on in our code, as the function call will be sent in a correct JSON format, making the parsing of the output easier.

For this function that doesn’t actually exist, we’re going to define an old-school vanilla OpenAI function description that describes how the function works to the LLM team supervisor. Add the following variable:

router_function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "next",
                "anyOf": [
                    {"enum": OPTIONS},
                ],
            }
        },
        "required": ["next"],
    },
}

This is actually JSON Schema vocabulary, but is quite readable. We define the name of the function as route and give it a description of what the function does. We then define the parameters that the function takes, giving the parameter object a title of routeSchema and defining that it is an object. Then we define the properties of this object, which is just a single property named next. This property has a title of next and the options available are anyOf the enumerate (list) of OPTIONS we defined up top. We then define that the next property is required.

This JSON Schema style is what the OpenAI API normally uses for function/tool calls, but LangChain has done this under the hood for the functions we have used so far. Again, this function will not actually exist, but that doesn’t stop us from feeding it to the LLM and extracting the next property from the arguments the LLM provides for us.

Team supervisor system prompt

Now let’s create a secondary file to store our prompt system setup messages as we’re going to be using quite a lot of them here. Create a new file named multi_agent_prompts.py in your project root:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ output
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py
        ๐Ÿ“„ weather.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ multi_agent.py
    ๐Ÿ“„ multi_agent_prompts.py    โœจNew file
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py

We’ll use this file to store the prompt string variables for the system messages our agents will use. If you’re watching the video tutorial version of this please be advised that there is a written blog version of this tutorial where you can copy these prompts so you don’t have to type them all over again, as we have a lot more of them coming. Let’s start with the team supervisor. Inside the multi_agent_prompts.py file add:

TEAM_SUPERVISOR_SYSTEM_PROMPT = """
You are a supervisor tasked with managing a conversation between the following workers: {members}. Given the following user request, respond with the worker to act next. Each worker will perform a task and respond with their results and status. The end goal is to provide a good travel itinerary for the user, with things to see and do, practical tips on how to deal with language difficulties, and a nice visualization that goes with the travel plan (in the form of an image path, the visualizer will save the image for you and you only need the path).

Make sure you call on each team member ({members}) at least once. Do not call the visualizer again if you've already received an image file path. Do not call any team member a second time unless they didn't provide enough details or a valid response and you need them to redo their work. When finished, respond with FINISH, but before you do, make sure you have a travel itinerary, language tips for the location, and an image file-path. If you don't have all of these, call the appropriate team member to get the missing information.
"""

So we have some basic instructions for the team supervisor on how to manage the team here. We have the placeholder {members} in there twice which will be replaced with the actual list of members. We tell it we want a travel itinerary with things to do and sightseeing, language tips, and a visualization for the itinerary. The prompt here is far from perfect and you can tweak it further if you like.

Save the multi_agent_prompts.py file and let’s get back to the multi_agent.py file. First of all, add an extra import up top with the other imports:

#... all the other imports ...

from multi_agent_prompts import TEAM_SUPERVISOR_SYSTEM_PROMPT

Note that we could just use from multi_agent_prompts import * as the * will simply import everything from the file, even the variables we add later, but this is a bad practice as it makes it hard to see where the variables come from and leads to namespace pollution. It’s better to explicitly define and keep track of what you’re importing or sooner or later you’re going to have multiple variables with the same name and you won’t know where they come from.

Team supervisor prompt template

Now scroll all the way back down past the router_function_def and add the following code to define our team supervisor’s prompt template manually as it will be different from all the other agents:

team_supervisor_prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", TEAM_SUPERVISOR_SYSTEM_PROMPT),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=", ".join(OPTIONS), members=", ".join(MEMBERS))

We use the same ChatPromptTemplate.from_messages method we used before, but this time we have three messages. The first is the TEAM_SUPERVISOR_SYSTEM_PROMPT we defined in the multi_agent_prompts.py file. The second is a MessagesPlaceholder for the messages variable and the third is a short system message that reminds the team supervisor what it’s task is and what options it has available to choose from.

This team supervisor prompt template will need 3 variables to be filled in and used properly.

  • The first is inside the TEAM_SUPERVISOR_SYSTEM_PROMPT where we used the members placeholder twice.
  • The second one is the messages for the MessagesPlaceholder in the middle.
  • The third is the options for the options placeholder in the last message.

We have two of these available, namely the options and the members, but we don’t have the messages yet. the .partial chained on method will let us fill in the two parts that we have and leave the messages part to be added later, so we can go ahead and pass our OPTIONS to the options placeholder and the MEMBERS to the members placeholder ahead of time using this partial filling in method.

Note that we use the join method on the OPTIONS and MEMBERS lists to turn them into a single string with the members separated by a comma and a space as we cannot pass list variables to LLMs.

Team supervisor node

So the team supervisor is basically going to act like a router between our agents, deciding who is up next. Remember in part 1 where we used LCEL with the | pipe operator to create chains by piping a prompt into an LLM and then into an output parser? These simple vanilla LangChain chains can also be used as nodes in LangGraph. As the team supervisor node is going to be special we will use our part 1 vanilla LangChain knowledge to simply chain it together manually:

team_supervisor_chain = (
    team_supervisor_prompt_template
    | LLM.bind_functions(functions=[router_function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

So we simply define the team_supervisor_chain as the prompt template we just made for it, then we pipe that into the LLM, and pipe that into a JsonOutputFunctionsParser. As we’re using a function here we can use the JSON output parser to extract the next property from the arguments the LLM provides for us.

The LLM here uses the bind_functions method to bind the router_function_def JSON Schema we defined as the available functions for this LLM call, and by passing in the second optional argument function_call="route" we tell the LLM that it MUST call the route function we defined earlier, meaning we are actually forcing it to call this function and not do anything else as this is its only purpose. Remember we added an entry in the AgentState to store the next parameter.

The system prompts for our other agents

Ok, now we need to create the agents that will make up the rest of our graph. These are going to be a lot easier as we’ll be able to use the create_agent function we wrote earlier. But first, we need some system setups which are going to be unique for each agent. Let’s move back over to the multi_agent_prompts.py file and add the following below the existing TEAM_SUPERVISOR_SYSTEM_PROMPT, starting with the travel agent:

TRAVEL_AGENT_SYSTEM_PROMPT = """
You are a helpful assistant that can suggest and review travel itinerary plans, providing critical feedback on how the trip can be enriched for enjoyment of the local culture. If the plan already includes local experiences, you can mention that the plan is satisfactory, with rationale.

Assume a general interest in popular tourist destinations and local culture, do not ask the user any follow-up questions.

You have access to a web search function for additional or up-to-date research if needed. You are not required to use this if you already have sufficient information to answer the question.
"""

So we just have some basic instructions here, and notice how we say that if the plan already includes local experiences the agent can mention that the plan is satisfactory already, to make sure we’re not forcing it to do pointless work. The second paragraph is to stop it from asking questions and expecting an answer from the user, it should just help us without asking stuff.

Finally, we tell it that we give it access to a web search function to do more research if it needs to, but it won’t use these much as it has most travel info hard-wired into the LLM already. (We’ll use these search functions more extensively in the last part). I’ve taken some inspiration for these agents and prompts from the Autogen demo agents here, but this is just a starting point, and these can be tweaked much further.

Now for the language assistant:

LANGUAGE_ASSISTANT_SYSTEM_PROMPT = """
You are a helpful assistant that can review travel plans, providing feedback on important/critical tips about how best to address language or communication challenges for the given destination. If the plan already includes language tips, you can mention that the plan is satisfactory, with rationale.

You have access to a web search function for additional or up-to-date research if needed. You are not required to use this if you already have sufficient information to answer the question.
"""

This is basically the same but with a focus on language tips instead of travel itinerary plans. Let’s move on to the visualizer:

VISUALIZER_SYSTEM_PROMPT = """
You are a helpful assistant that can generate images based on a detailed description. You are part of a travel agent team and your job is to look at the location and travel itinerary and then generate an appropriate image to go with the travel plan. You have access to a function that will generate the image as long as you provide a good description including the location and visual characteristics of the image you want to generate. This function will download the image and return the path of the image file to you.

Make sure you provide the image, and then communicate back as your response only the path to the image file you generated. You do not need to give any other textual feedback, just the path to the image file.
"""

This one is a bit different as it’s going to generate an image for us. We tell it that it should only provide the path to the image file and not any other feedback. This is of course because the image generation tool that we wrote ourselves will save the image to disk and return the path to the image file, so we don’t need any other feedback from the agent other than the path which means that the image generation was successful.

Now we have one last agent’s system prompt to define, the designer, which is going to exist outside of our team of three agents above. We will also need the path to the images folder in our project to insert into this prompt. First scroll all the way back up to the top of the multi_agent_prompts.py file, and add the following import:

from tools.image import IMAGE_DIRECTORY

Now scroll all the way back down again and add the designer’s system prompt, this time using a multi-line f string:

DESIGNER_SYSTEM_PROMPT = f"""
You are a helpful assistant that will receive a travel itinerary in parts. Some parts will be about the travel itinerary and some will be the language tips, and you will also be given the file path to an image. Your job is to call the markdown_to_pdf_file function you have been given, with the following argument:

markdown_text: A summary of the travel itinerary and language tips, with the image inserted, all in valid markdown format and without any duplicate information.

Make sure to use the following structure when inserting the image:
![Alt text]({str(IMAGE_DIRECTORY)}/image_name_here.png) using the correct file path. Make sure you don't add any stuff like 'file://'.

Start with the image and itinerary first and the language tips after, creating a neat and organized final travel itinerary with the appropriate markdown headings, bold words and other formatting.
"""

We explain that it’s function is to call the markdown_to_pdf_file function we wrote passing in a full markdown summary with the image inserted as well. We give it specific instructions on how to format the image link in the markdown so it will work with our converter, and finally give it some last instructions on the structure we want.

Inside your multi_agent_prompts.py file you now have the following constants:

TEAM_SUPERVISOR_SYSTEM_PROMPT = ...
TRAVEL_AGENT_SYSTEM_PROMPT = ...
LANGUAGE_ASSISTANT_SYSTEM_PROMPT = ...
VISUALIZER_SYSTEM_PROMPT = ...
DESIGNER_SYSTEM_PROMPT = ...

Creating our agents and nodes

Go ahead and save and close the multi_agent_prompts.py file and let’s get back to the multi_agent.py file. First lets update our import up top with the other imports, changing it like this:

#... all the other imports ...

from multi_agent_prompts import (
    TEAM_SUPERVISOR_SYSTEM_PROMPT,
    TRAVEL_AGENT_SYSTEM_PROMPT,
    LANGUAGE_ASSISTANT_SYSTEM_PROMPT,
    VISUALIZER_SYSTEM_PROMPT,
    DESIGNER_SYSTEM_PROMPT,
)

Then go ahead and scroll all the way back down to the bottom of the file and let’s start creating some agents and nodes! First up is the travel agent:

travel_agent = create_agent(LLM, [TAVILY_TOOL], TRAVEL_AGENT_SYSTEM_PROMPT)
travel_agent_node = functools.partial(
    agent_node, agent=travel_agent, name=TRAVEL_AGENT_NAME
)

First we create the travel_agent by calling our create_agent function and passing in the LLM, a list with the TAVILY_TOOL in it as our list of tools, as we promised it an internet tool if it needed one, and the TRAVEL_AGENT_SYSTEM_PROMPT. We now have our travel agent / executor.

To get the travel agent’s node we need to use the agent_node function we defined before, which needs three arguments, the agent, the state and the name of the agent in string format. We have the agent and the name already, but the state will only be available at runtime. To solve this problem we can use the functools.partial function to create a new function that has the agent and name already filled in, and then we can pass in the state at runtime.

If you’re unfamiliar with functools.partial, it basically works like this:

########### Example, not part of the code ############
# Original function
def multiply(x, y):
    return x * y

# Create a new function that multiplies by 2
multiply_by_two = functools.partial(multiply, x=2)

result = multiply_by_two(3)
print(result)  # Output: 6

So it takes a function and creates a new function based on the original with a portion of the arguments already filled in, reducing the number of arguments the function takes in it’s new form. This is very useful as we now have our complete travel_agent_node that needs only the state object to be passed in for it to work.

Now in exactly the same manner we can create our language_assistant, visualizer, and designer agents and nodes:

language_assistant = create_agent(LLM, [TAVILY_TOOL], LANGUAGE_ASSISTANT_SYSTEM_PROMPT)
language_assistant_node = functools.partial(
    agent_node, agent=language_assistant, name=LANGUAGE_ASSISTANT_NAME
)

visualizer = create_agent(LLM, [generate_image], VISUALIZER_SYSTEM_PROMPT)
visualizer_node = functools.partial(agent_node, agent=visualizer, name=VISUALIZER_NAME)

designer = create_agent(LLM, [markdown_to_pdf_file], DESIGNER_SYSTEM_PROMPT)
designer_node = functools.partial(agent_node, agent=designer, name=DESIGNER_NAME)

The language assistant takes the TAVILY_TOOL, while our visualizer needs the generate_image and the designer the markdown_to_pdf_file tool. We then create the nodes for each of these agents in the same way we did for the travel agent above, passing in their respective names using the ...NAME constants we defined up top.

Creating the graph

Time to create our graph and the nodes:

workflow = StateGraph(AgentState)
workflow.add_node(TRAVEL_AGENT_NAME, travel_agent_node)
workflow.add_node(LANGUAGE_ASSISTANT_NAME, language_assistant_node)
workflow.add_node(VISUALIZER_NAME, visualizer_node)
workflow.add_node(DESIGNER_NAME, designer_node)
workflow.add_node(TEAM_SUPERVISOR_NAME, team_supervisor_chain)

We initialize the StateGraph passing in our AgentState format we defined. Then we simply create a node for each agent passing in the name first, and the actual node second. Note that we’ve used these ...NAME variables several times now, which is why we defined them up top as constants to give them only a single point of definition instead of repeating strings all over the place.

Now that we have the nodes let’s start building some connections:

for member in MEMBERS:
    workflow.add_edge(member, TEAM_SUPERVISOR_NAME)

workflow.add_edge(DESIGNER_NAME, END)

For every member in the list of team MEMBERS we add an edge back to the team supervisor, as it will decide where to go next between each step. We also add an edge from the designer to the END node as the designer is the last step in our graph and will exist outside of the team.

So far we have this, and these are all hard edges with no conditions. Now it is time for us to add some conditional edges:

conditional_map = {name: name for name in MEMBERS}
conditional_map["FINISH"] = DESIGNER_NAME
workflow.add_conditional_edges(
    TEAM_SUPERVISOR_NAME, lambda x: x["next"], conditional_map
)

We create a conditional_map dictionary that maps each member to itself, and then we add a key "FINISH" that maps to the DESIGNER_NAME. So if the team supervisor calls on the "visualizer" this will simply map like {"visualizer": "visualizer"} but the one exception is the {"FINISH": "designer"} mapping.

We then call the add_conditional_edges method on the workflow object. This method takes the start point, so we pass in the TEAM_SUPERVISOR_NAME, a function that will return a value, and then a mapping that will map that value to the next desired node.

The function is a lambda that takes the state object as input and simply returns the state’s next key that the team supervisor has put in there. The conditional_map is the mapping we defined above, so if the team supervisor calls on a team member it will map to that team member’s node, but if it calls "FINISH" it will map to the "designer" node.

Now set the entry point and compile the graph:

workflow.set_entry_point(TEAM_SUPERVISOR_NAME)

travel_agent_graph = workflow.compile()

Our completed graph now looks like this:

Where the white lines represent the fixed edges and the dotted lines represent conditional ones. Now let’s actually give this a test run and see what happens!:

for chunk in travel_agent_graph.stream(
    {"messages": [HumanMessage(content="I want to go to Paris for three days")]}
):
    if "__end__" not in chunk:
        print(chunk)
        print(f"{Fore.GREEN}#############################{Style.RESET_ALL}")

So we’re going to call stream on the travel_agent_graph and pass in a dictionary with the messages key and a list with a single HumanMessage object in it, saying that we want to visit Paris. for three days. We then loop over the chunks and print them out, and then print a line of #s in green to visually separate the chunks.

Now go ahead and run this and let’s see what happens! You may see some printer message popup, again just click X on it if it pops up for now. When it’s done running have a look in your output folder for the final result:

That is pretty darn cool right! Our whole team of AI agents is working together to do our bidding without any work on our part! Everything worked exactly as expected with the routing and everything, which you can confirm in your LangSmith dashboard (https://smith.langchain.com/) as well by checking out the trace for the run:

We can see that after each step the system returns to the team supervisor and at the end it breaks out of the team towards the designer. I’ve done a bunch more test runs to verify that it works well and here are some example runs for other destinations:

Remember that I’ve been using the gpt-3.5-turbo-0125 model all this time. You can easily swap out any of the models for gpt-4-turbo if you want more detail, or if you have some trouble with a specific node. Say the designer has trouble working consistently, you could just swap out only that node for a different model with a higher quality and leave the rest as is.

You can literally create just about any combination of agents, nodes, edges, and conditional edges you want. The combination possibilities are mind-boggling. We decided to have one agent outside of the team here, no problem! We can also have 2 teams or even more if we want, each with their own manager. Your imagination is the limit here.

That’s it for part 5! In the next and last part, we’ll take a look at writing and integrating asynchronous tools into our systems. I’ll see you there!

Multi-Agent LangGraph – Web Research and Asynchronous Tools

Hello and welcome back to the last part of the LangGraph/LangChain/LangSmith course. In this part, we’ll learn how to deal with asynchronous tools by building a graph that will do some web research for us, where one of the tools is going to be visiting several websites at once to feed info back into the graph.

This type of asynchronous action is very helpful when there are multiple steps or actions that can be performed at the same time for optimization as it will save a lot of time and make the user experience much better. It is a bit different to set up and work with though, which is why we’ll be going through it in this part.

I will try to cover the bare basics of async Python programming here, as it can look quite confusing and I want all skill levels to be able to follow along. If you are already very familiar with async programming the level of explanation may be a bit excessive for you and you can probably skip over some of the explanations and just look at the code.

Web research tool

Let’s start by building our tool as usual. This tool is going to visit a bunch of web URLs at the same time (asynchronously) and return the HTML content of each page. We will need to install the BeautifulSoup library to parse the HTML content of the pages. Run the following command in the terminal:

pipenv install beautifulsoup4==4.12.3

Then go ahead and create a new file called web.py in the tools directory:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ output
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py
        ๐Ÿ“„ weather.py
        ๐Ÿ“„ web.py    โœจNew file
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ multi_agent.py
    ๐Ÿ“„ multi_agent_prompts.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py

In the web.py file let’s start with our imports as usual:

import asyncio
import json
import sys

import aiohttp
from bs4 import BeautifulSoup
from langchain.tools import tool
from pydantic import BaseModel, Field

We import asyncio to work with asynchronous code, aiohttp to make HTTP requests asynchronously, and BeautifulSoup to parse the HTML content of the pages. The tool decorator and pydantic imports are the same as for the other tools and json is to return the JSON responses in string format.

Async and event loops

First of all, we’ll use the sys import to set the type of event loop to use for the asynchronous code:

if sys.platform.startswith("win"):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Without going into too much detail, there is a known issue with the Python asyncio library on Windows specifically that happens when the Proactor event loop (the default on Windows) is closed while there are still outstanding tasks. It doesn’t affect the correct execution of the code, but something on Windows + aysncio + LangChain/LangGraph triggers it. We’ll use the selector event loop policy to avoid this issue (this is only needed/triggers if you’re on Windows.).

While this tutorial part is way too short to really go in-depth on Python’s asynchronous programming, we’ll try to cover the basics as we go along. Basically, we get an event loop, and we can put tasks in there. Normally a task like fetching a webpage would block the code until it’s done, but with asyncio we can put it in the event loop and continue with other tasks while it’s being fetched. This allows us to run multiple operations at the same time.

This is not to be confused with multi-threading or multi-processing, which are quite different in nature:

  • Multi-processing: is about spreading tasks over a computer’s CPU cores, and is well suited for tasks that require lots of mathematical computations.
  • Multi-threading: is about running multiple threads in the same process, and is well suited for tasks that are I/O bound (like fetching webpages).
  • Asynchronous programming: is a single-process, single-threaded design that uses coroutines to handle multiple tasks concurrently. Async functions are able to sort of pause and resume their execution, allowing other tasks to run in the meantime during this pause.

Async programming in Python is very similar to the JavaScript async/await pattern, and it’s a great way to handle I/O-bound tasks like fetching web pages. If you’re a bit new to this all, just keep going and you’ll get a feel for how it works.

Parsing HTML content

First, we’ll write a very basic function that uses BeautifulSoup to parse some HTML content:

def parse_html(html_content: str) -> str:
    soup = BeautifulSoup(html_content, "html.parser")
    for tag in ["nav", "footer", "aside", "script", "style", "img", "header"]:
        for match in soup.find_all(tag):
            match.decompose()

    text_content = soup.get_text()
    text_content = " ".join(text_content.split())
    return text_content[:8_000]

This function takes the HTML content of a webpage as a string and returns a string with the text content of the page. First we instantiate a new BeautifulSoup object passing in the html.parser string to select the parser. We then make a list of all the HTML tags we want to filter out, namely the navigation, footer, aside, script, style, image and header tags. We’re interested in the main content and don’t want all this pollution.

For each tag in this list of HTML tags, we run soup.find_all(tag) to find all the tags with that name in the HTML content, which returns all the matches for that tag. This allows us to loop over each match in soup.find_all(tag) and call match.decompose() to remove the tag from the HTML content.

We then get the text content of the page with soup.get_text() to remove as much HTML and unneeded stuff as possible from what was left.

Then we call text_content.split() to split the text content into a list of words, which has the side effects of removing long sequences of whitespace, tab, and newline characters. We then join the list of words back into a string with " ".join so that we’re left with only a single space between all words to save space. The LLM does not care about formatting and sending tons of whitespace to it is just a waste of space.

Finally, we return the first 8,000 characters of the text content, to make sure we don’t exceed the context limit if we load like 5 or 6 pages at once. You can set this higher if you use GPT-4-turbo instead of 3.5-turbo

Fetching webpages

Notice that the parse_html function is just a regular synchronous function. Now let’s get started on the asynchronous part. The first thing we’ll do is write a function to fetch the HTML content of a single webpage, and then we can just call this function multiple times to fetch the content of multiple pages at once.

async def get_webpage_content(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            html_content = await response.text()

    text_content = parse_html(html_content)
    print(f"URL: {url} - fetched successfully.")
    return text_content

First, we declare our async function using async def instead of the normal def. This will allow us to later call await on this function to make the code non-blocking and run other tasks while we wait for the response. We take a URL string and return a string.

Where we would normally fetch a webpage with the requests library, here we need to use aiohttp which is an asynchronous HTTP client/server library for Python that allows us to write this non-blocking code. The ClientSession object represents a single web session, so you could set headers or cookies here that apply to all requests in this session.

The whole thing can be used as a context manager giving us the async with aiohttp.ClientSession() as session syntax and any indented code afterward now takes place inside this context. Then we call get(url) on the session object and use that as a context manager in the same exact manner as the line above it.

The line after calls await on the response.text() and will then save this in the html_content variable. This await keyword is the magic, and whenever you see this keyword it sort of pauses this code, as time is needed to fetch the webpage. While this is happening, other tasks can run in the event loop.

When the html_content has finished fetching, we move outside of the two async context managers and call our parse_html function to get the text content of the page. We then print a message to the console that the URL was fetched successfully and return the text content.

Note that we could easily edit the above function to fetch the whole list of URLs we have inside the same ClientSession context manager, but as the overhead to calling this function a couple of times is minimal, I’ll just keep it as is for now.

Another fair point to make is that the parse_html function is technically blocking non-async code, but as it doesn’t take long to run at all, it’s fine to keep it here. The main time-waster is the fetching of the webpages and we made that asynchronous.

Input arguments and the tool

Before we get to the actual tool itself we need to make sure to define our pydantic object with the input arguments for the tool:

class ResearchInput(BaseModel):
    research_urls: list[str] = Field(description="Must be a list of valid URLs.")

No surprises here, we just want a list of URLs in string format. We’ve used this type of object several times before.

Now let’s write our tool, starting with the first half:

@tool("research", args_schema=ResearchInput)
async def research(research_urls: list[str]) -> str:
    """Get content of provided URLs for research purposes."""
    tasks = [asyncio.create_task(get_webpage_content(url)) for url in research_urls]

We use the @tool decorator to define our tool, passing in the name and the argument schema as always. We declare the function making sure to use async def, and we declare the same research_urls argument as we defined in the ResearchInput class. Again mind the docstring description for the LLM to use.

Then we use a list comprehension, let’s read it from the right to the left. for each url in the list of research_urls, we call asyncio.create_task(get_webpage_content(url)) to create a task for each URL. The asyncio.create_task() function schedules the coroutine to run on the event loop and returns a Task object. However, it doesn’t automatically await the task.

What this means is that it will create our async task and also start it for us, but it won’t await it, or wait for it to finish, which would block the code. We are left with a list full of these task objects of tasks that are currently running but not yet finished.

Let’s finish our tool:

@tool("research", args_schema=ResearchInput)
async def research(research_urls: list[str]) -> str:
    """Get content of provided URLs for research purposes."""
    tasks = [asyncio.create_task(get_webpage_content(url)) for url in research_urls]
    contents = await asyncio.gather(*tasks, return_exceptions=True)
    return json.dumps(contents)

The asyncio.gather() function is used to schedule multiple tasks to run and waits for all of them to complete. It will wait for all our tasks from the previous line to fetch their web pages and then gather the results. This is why we await this function, and then save the results in contents. *tasks is a way to unpack the list of tasks into separate arguments passing them into the function.

The return_exceptions parameter in asyncio.gather() determines how exceptions are handled. If return_exceptions is set to False, gather() will immediately raise the first exception it encounters. When set to True, instead of raising exceptions, it will return them in the result list so that contents will be a list of results or exceptions. We use this as we want to go ahead and fetch the rest of the pages even if one fails.

Finally, dump the response to a JSON string and return it, as naturally, LLMs need string input.

Testing the tool

Now let’s add a quick test to this file to test our tool in isolation and make sure there are no problems:

if __name__ == "__main__":
    import time

    TEST_URLS = [
        "https://en.wikipedia.org/wiki/SpongeBob_SquarePants",
        "https://en.wikipedia.org/wiki/Stephen_Hillenburg",
        "https://en.wikipedia.org/wiki/The_SpongeBob_Movie:_Sponge_Out_of_Water",
    ]

    async def main():
        result = await research.ainvoke({"research_urls": TEST_URLS})

        with open("test.json", "w") as f:
            json.dump(result, f)

    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    print(f"Async time: {end_time - start_time} seconds")

We’ve covered the if __name__ == "__main__": block before, so only if we run this file directly will the code inside this block run. We define a list of test URLs to use and then define an async function called main() to run our tool with these test URLs. Instead of invoking the tool as we normally do we now use ainvoke for the async version, and we have to await the result. This is why the main function is async as well.

We then open a file called test.json in write mode and dump the result to it so we can have a quick look to check if the output is as expected. Finally we run the main() function with asyncio.run(main())

asyncio.run is a useful function that creates a new event loop, runs the given coroutine which is main in our case, closes the loop, and then returns the result. This makes it a convenient way to run async code from a synchronous context as it handles the whole event loop thing for us.

I’ve also sneaked a start and end timer in there using time.time() to see how long it takes to run the async code.

Now go ahead and run the web.py file and you’ll see something like this:

URL: https://en.wikipedia.org/wiki/The_SpongeBob_Movie:_Sponge_Out_of_Water - fetched successfully.
URL: https://en.wikipedia.org/wiki/Stephen_Hillenburg - fetched successfully.
URL: https://en.wikipedia.org/wiki/SpongeBob_SquarePants - fetched successfully.
Async time: 2.9387967586517334 seconds

I have also tried the synchronous normal version of this code using the requests library, and it took over 7 seconds, so we have a considerable time save here, and this is with only 3 URLs. If I increase the number of URLs to just 6, the async version takes about 4 seconds, while the synchronous version takes like 14.

If you open the test.json file that has been created you should see something like the following that goes on for quite a while:

"[\"SpongeBob SquarePants - Wikipedia Jump to content From Wikipedia, the free encyclopedia American animated television series This article is about the television series.........

Web research graph setup

We have just written our first async tool! Now let’s put it to good use and write up a quick web research graph. In your root folder create two new files called web_research.py and web_research_prompts.py:

๐Ÿ“‚ FINX_LANGGRAPH
    ๐Ÿ“‚ images
    ๐Ÿ“‚ output
    ๐Ÿ“‚ tools
        ๐Ÿ“„ __init__.py
        ๐Ÿ“„ image.py
        ๐Ÿ“„ pdf.py
        ๐Ÿ“„ weather.py
        ๐Ÿ“„ web.py
    ๐Ÿ“„ .env
    ๐Ÿ“„ langchain_basics.py
    ๐Ÿ“„ multi_agent.py
    ๐Ÿ“„ multi_agent_prompts.py
    ๐Ÿ“„ Pipfile
    ๐Ÿ“„ Pipfile.lock
    ๐Ÿ“„ setup_environment.py
    ๐Ÿ“„ simple_langgraph.py
    ๐Ÿ“„ web_research.py          โœจNew file
    ๐Ÿ“„ web_research_prompts.py    โœจNew file

The graph here will be reasonably simple, having two agents. One of them will use Tavily to do a basic search query, and the other one will use our async tool to do more in-depth research on the URLs provided by the first Tavily agent. You know the drill by now, so we’ll just define our system prompts for the agents before we get started on the main file. If you’re watching the video version of this tutorial make sure you open up the written version so you can more easily copy these. Start by opening up the web_research_prompts.py file.

We’ll get started with the Tavily agent’s system prompt first:

TAVILY_AGENT_SYSTEM_PROMPT = """
You are a search agent. Your tasks is simple. Use your tool to find results on the internet for the user query, and return the response, making sure to include all the sources with page title and URL at the bottom like this example:

1. [Title 1](https://www.url1.com/whatever): ...
2. [Title 2](https://www.url2.com/whatever): ...
3. [Title 3](https://www.url3.com/whatever): ...
4. [Title 4](https://www.url4.com/whatever): ...
5. [Title 5](https://www.url5.com/whatever): ...

Make sure you only return the URLs that are relevant for doing additional research. For instance:
User query Spongebob results from calling your tool:

1. [The SpongeBob Official Channel on YouTube](https://www.youtube.com/channel/UCx27Pkk8plpiosF14qXq-VA): ...
2. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ...
3. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ...
4. [Wikipedia - Excavators](https://en.wikipedia.org/wiki/Excavator): ...
5. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ...


Given the results above and an example topic of Spongebob, the Youtube channel is going to be relatively useless for written research, so you should skip it from your list. The Wikipedia article on Excavators is not related to the topic, which is Spongebob for this example, so it should be omitted. The others are relevant so you should include them in your response like this:
1. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ...
2. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ...
3. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ...
"""

This is a bit of a long prompt, but it’s quite simple. The Tavily agent is tasked with finding relevant URLs for a given query, and then returning the URLs that are relevant for further research. The prompt gives an example of what the response should look like and also gives an example of what URLs are relevant and what URLs are not.

Now let’s define the system prompt for the web research agent:

RESEARCHER_SYSTEM_PROMPT = """
You are an internet research information-providing agent. You will receive results for a search query. The results will look something like this:

1. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ...
2. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ...
3. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ...

Your job is to use your research tool to find more information on the topic and to write an article about the information you find in markdown format. You will call the research tool with a list of URLs, so for the above example your tool input will be:

["https://en.wikipedia.org/wiki/SpongeBob_SquarePants", "https://www.nick.com/shows/spongebob-squarepants", "https://www.imdb.com/title/tt0206512/"]

After you have finished your research you will write a long-form article on all the information you found and return it to the user, making sure not to leave out any relevant details. Make sure you include as much detail as possible and that the article you write is on the topic (for instance Pokemon) instead of being about the websites that you visited (e.g. Wikipedia, YouTube). Use markdown formatting and supply ONLY the resulting article in your response, with no extra chatter except for the fully formed, well-written, and formatted article. Use headers, sub-headers, bolding, bullet lists, and other markdown formatting to make the article easy to read and understand. Your only output will be the fully formed and detailed markdown article.
"""

The agent is tasked with using the web research tool to find more information on a topic and then writing an article about the information found. The prompt gives an example of what the input to the tool should look like and then specific instructions on using markdown formatting to write the output article and details on the article we want it to write. Save and close the web_research_prompts.py file.

Web research graph main file

Now let’s move on to the main file web_research.py and start by importing the necessary modules:

import asyncio
import functools
import operator
import uuid
from typing import Annotated, Sequence, TypedDict

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph

from setup_environment import set_environment_variables
from tools.pdf import OUTPUT_DIRECTORY
from tools.web import research
from web_research_prompts import RESEARCHER_SYSTEM_PROMPT, TAVILY_AGENT_SYSTEM_PROMPT

You’ve seen pretty much all of these imports before in some part of our code so far. We import the prompts we just created and the web research function as well as the OUTPUT_DIRECTORY we defined in the pdf.py file so that we can access this folder to save our output. To do this properly it would be best to store these project-wide constants like the paths in a separate file but for now, we’ll just import it from pdf.py.

Now continue below the imports:

set_environment_variables("Web_Search_Graph")

TAVILY_TOOL = TavilySearchResults(max_results=6)
LLM = ChatOpenAI(model="gpt-3.5-turbo-0125")

TAVILY_AGENT_NAME = "tavily_agent"
RESEARCH_AGENT_NAME = "search_evaluator_agent"
SAVE_FILE_NODE_NAME = "save_file"

We load up our variables and use the project name Web_Search_Graph for our LangSmith traces. We create a new instance of the Tavily search tool we imported setting the max_results to 6, and we create a ChatOpenAI object as usual. After that we set up some string constants for the names of our agents and nodes again.

We’ll have the create_agent function which is basically the same as last time:

def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)  # type: ignore
    return executor

No real changes there so let’s move on to the AgentState definition:

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

We have a simple list of BaseMessage objects and every node in the graph will add a message to this list as the state passes through that particular node.

Creating our nodes

Now we’ll have a basic function to create a new agent node similar to what we’ve done before, but this time we’ll also have one to create an async agent node:

def agent_node(state: AgentState, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}


async def async_agent_node(state: AgentState, agent, name):
    result = await agent.ainvoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

The first one is pretty much the same as the one we used before, making sure we return a message in line with what we defined each node will add to the state object. The second one is the async version of the same function. We use async def and here we await the agent’s ainvoke method instead of just calling the normal invoke method.

Now we can create our Tavily agent and our research agent:

tavily_agent = create_agent(LLM, [TAVILY_TOOL], TAVILY_AGENT_SYSTEM_PROMPT)
tavily_agent_node = functools.partial(
    agent_node, agent=tavily_agent, name=TAVILY_AGENT_NAME
)


research_agent = create_agent(LLM, [research], RESEARCHER_SYSTEM_PROMPT)
research_agent_node = functools.partial(
    async_agent_node, agent=research_agent, name=RESEARCH_AGENT_NAME
)

You’ve seen all of this before, but make sure you use the async_agent_node function for the research agent instead of the normal one.

Now we need one more node, that will take the output of the research agent and write it to a file for us. This node does not need any agents or LLM action, so we can just define it as a normal function:

def save_file_node(state: AgentState):
    markdown_content = str(state["messages"][-1].content)
    filename = f"{OUTPUT_DIRECTORY}/{uuid.uuid4()}.md"
    with open(filename, "w", encoding="utf-8") as file:
        file.write(markdown_content)
    return {
        "messages": [
            HumanMessage(
                content=f"Output written successfully to {filename}",
                name=SAVE_FILE_NODE_NAME,
            )
        ]
    }

This shows that the graph is really nothing but a state machine. We can just write any arbitrary function and use it as a node as long as we meet the conditions we set for the graph. The function takes the AgentState object as input, does whatever it wants to do, and then adds an update to the AgentState object as promised. It doesn’t matter that there is no agent or LLM in this step.

In this case, we extract the markdown content from the state object’s last message [-1] which is the research node’s output. We then generate a random filename using the uuid module and write the markdown content to a file with that name and the .md extension. Finally, we return a message to the state object that the output was written successfully.

Piecing our graph together

Now we can define our graph:

workflow = StateGraph(AgentState)
workflow.add_node(TAVILY_AGENT_NAME, tavily_agent_node)
workflow.add_node(RESEARCH_AGENT_NAME, research_agent_node)
workflow.add_node(SAVE_FILE_NODE_NAME, save_file_node)

workflow.add_edge(TAVILY_AGENT_NAME, RESEARCH_AGENT_NAME)
workflow.add_edge(RESEARCH_AGENT_NAME, SAVE_FILE_NODE_NAME)
workflow.add_edge(SAVE_FILE_NODE_NAME, END)

workflow.set_entry_point(TAVILY_AGENT_NAME)
research_graph = workflow.compile()

We just go from the Tavily agent to the research agent, and then from the research agent to the save file node. This example is pretty simple as we’re focusing on the async part. We can always add this to more complex graphs later on if we need to.

Now let’s create a main function to run the graph:

async def run_research_graph(input):
    async for output in research_graph.astream(input):
        for node_name, output_value in output.items():
            print("---")
            print(f"Output from node '{node_name}':")
            print(output_value)
        print("\n---\n")

This function is an async function that takes an input and then runs the graph with that input. It uses an async for loop to iterate over the output of the graph after we run astream (async stream) on it. For each output, we get the node’s name and the output value, so we print both to the console to see what is going on live.

Now we can run the graph with a simple test input:

test_input = {"messages": [HumanMessage(content="Jaws")]}

asyncio.run(run_research_graph(test_input))

We create the first input message for the state object and then use asyncio.run as we did before because it takes care of the event loop that runs the async code for us. Save and run this file and you should see the graph running and outputting the results to the console:

API Keys loaded and tracing set with project name:  Web_Search_Graph
Output from node 'tavily_agent':
---
{'messages': [HumanMessage(content='Here are some relevant sources about "Jaws": ... ', name='tavily_agent')]}

---

URL: https://www.imdb.com/title/tt0073195/ - fetched successfully.
URL: https://www.rottentomatoes.com/m/jaws - fetched successfully.
URL: https://www.britannica.com/topic/Jaws-film-by-Spielberg - fetched successfully.
URL: https://en.wikipedia.org/wiki/Jaws_(film) - fetched successfully.
Output from node 'search_evaluator_agent':
---
{'messages': [HumanMessage(content='# **Jaws: A Deep Dive into the Iconic Film**\n\n## markdown summary here... ', name='search_evaluator_agent')]}

---

Output from node 'save_file':
---
{'messages': [HumanMessage(content='Output written successfully to c:\\Coding_Vault\\FINX_LANGGRAPH_TUTS\\output/d22855f8-9f76-4fc6-8192-7490852e1644.md', name='save_file')]}

---

Output from node '__end__':
---
{'messages': ['The whole state object...']}

---

Go ahead and open the .md file that was created in the output folder and you should see the markdown article that was written by the research agent:

I’ve gone ahead and tried another one inputting the topic “Pokemon”:

There you go! We’ve created a pretty fast and very useful internet research and article-writing tool!

From here on we can create PDF files, send emails, write articles, or do anything and everything we want really. We can tweak the output or the number of input URLs, or use gpt-4-turbo if we want a very long output article and large input context window so we can use even more sources.

We can add any conditional edges and paths and have the agents do whatever we want! All we’ve shown is just the basic ways in which you can combine stuff. You now have all the knowledge you need to build whatever you want. I’ll leave the rest up to your imagination.

It’s been a pleasure to take this journey together. I hope you learned a lot and had some fun along the way. I’ll see you again soon in the next one, until then, happy coding!

๐Ÿ’ก Info: This course is a complete text tutorial. It’s based on our academy course. If you’re interested in video explainers, check out the course here.

By the way, we’ve been featured on Feedspot’s Top 10 Python Blogs list! ๐Ÿฅณ