How Haystack Pipelines and Astra DB Simplify AI App Development

DataStax
6 min readMar 7, 2024

--

By Carter Rabasa

If you’re a developer who has started to explore the possibilities of generative AI (GenAI), you might have noticed how many models are out there, how important it is to choose the right one for your application, and how much orchestration is often required between your code and the large language model. Thankfully, there are tools that help address these challenges, and there’s a great one for Python developers called Haystack.

Haystack is the open source Python framework by deepset for building custom apps with LLMs. It lets you quickly try out the latest models in natural language processing (NLP), and it’s flexible and easy to use. One of the great things about Haystack is the number of AI tools that it integrates with, and this includes vector databases like DataStax Astra DB.

Let’s walk through a simple example of how you can use Haystack pipelines to store and retrieve data in Astra DB on your local machine. If you prefer, we’ve created a notebook that you can use as well.

Setup

To get started you’ll need a few things:

After you’ve signed-up for Astra DB, you’ll need to create a new vector database. Go ahead and log in to your account and create a new serverless vector database. Give it any name you like, but for our purposes we’re going to name it “haystack-astradb.” Pick a cloud provider and region — any of them will do for the purpose of this tutorial.

While you’re waiting for the database to provision, create a new directory for your project.

mkdir astra-haystack

In that directory, create a .env file. You’ll use this to store secret credentials and config information for your vector database. Go ahead and populate it with these keys to start:

ASTRA_DB_API_ENDPOINT=
ASTRA_DB_APPLICATION_TOKEN=
OPENAI_API_KEY=
TOKENIZERS_PARALLELISM=false

Note: we set TOKENIZERS_PARALLELISM to false to silence warnings from the sentence-transformers module. For our purposes this is fine, but you'll want to play with that value if you're loading larger data sets.

Create a new API key for your OpenAI account and paste it in here.

After your serverless database has been provisioned (yay!) we’ll need to copy some configuration data for use in our app.

Copy the API Endpoint into your .env file, generate a new token, and copy that into your .env file as well.

Installing dependencies

Allright, let’s get started coding! First, let’s create a virtual environment so that we’re not polluting our global Python environment with the libraries we’re about to install.

python -m venv venv

Once you’ve done that, go ahead and activate it.

source venv/bin/activate

Now that the virtual environment is set up, install the two dependencies that we’ll need for this tutorial.

pip install astra-haystack sentence-transformers python-dotenv

Note: if you run into a build error during the installation of “sentence-transformers”, you may be able to fix it by running the command below.

brew install cmake

Storing data in Astra DB using a Haystack pipeline

Let’s get started learning about Haystack pipelines by loading some data from the Star Wars Wikipedia page. We’re going to need to do the following:

  • Download the content of the webpage
  • Strip away the markup from the text of the page
  • Break the text up into smaller chunks
  • Compute embeddings for those chunks
  • Store the chunks and embeddings in Astra DB

Thanks to Haystack, all of the above is super simple. Let’s create a script called “load_data.py” and import some dependencies:

import logging
import os
from dotenv import load_dotenv
from haystack import Pipeline
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.document_stores.astra import AstraDocumentStore

In addition to core parts of Haystack, we are making use of the AstraDocumentStore, which enables documents to be processed by Haystack and stored in Astra DB.

Now, let’s initialize the elements of the pipeline:

# load variable defined in .env into the environment
load_dotenv()
# turn on logging at the INFO level
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# initialize the fetcher that will download content from a webpage as HTML
fetcher = LinkContentFetcher()
# initialize the converter that will take HTML and turn it into plain text
converter = HTMLToDocument()
# initialize the splitter that will take the text and break it into chunks
splitter = DocumentSplitter(split_by="word", split_length=50)
# define the model that we'll use to create embeddings
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
# initialize the document store
document_store = AstraDocumentStore(
duplicates_policy=DuplicatePolicy.SKIP,
embedding_dimension=384,
collection_name="starwars"
)
# initialize the Haystack pipeline
index_pipeline = Pipeline()

Please note that the embedding model we’re using outputs 384 dimensional vectors, which we match in our initialization of the AstraDocumentStore.

Finally, add these elements to the pipeline and run the pipeline!

# add the components to the pipeline
index_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name), name="embedder")
index_pipeline.add_component(instance=fetcher, name="fetcher")
index_pipeline.add_component(instance=converter, name="converter")
index_pipeline.add_component(instance=splitter, name="splitter")
index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
# connect the components in the order they should be executed
index_pipeline.connect("fetcher.streams", "converter.sources")
index_pipeline.connect("converter.documents", "splitter.documents")
index_pipeline.connect("splitter.documents", "embedder.documents")
index_pipeline.connect("embedder.documents", "writer.documents")
# run the pipeline
index_pipeline.run(data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Star_Wars"]}})
# print the number of documents processed
print(document_store.count_documents())

Open up a terminal and run the script:

python load_data.py

If all goes well, the Star Wars Wikipedia page will get broken up into ~161 documents that will get stored in Astra DB along with the computed embedding. It’s very important that we’re storing both the embedding and the text used to create it. You’ll see why in the next section.

Now this is cool, but you know what’s cooler? Answering Star Wars trivia!

Getting answers to Star Wars questions using a Retriever

Now that we’ve got vector and text data stored in the database, let’s see what it takes to build a simple RAG application. Create a file called “retrieve_data.py” with the following dependencies:

import os
from dotenv import load_dotenv
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack_integrations.document_stores.astra import AstraDocumentStore
from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever

This time around we’re going to make use of AstraEmbeddingRetriever, which will enable us to run vector search on the documents and related embeddings that we’ve stored.

Let’s initialize the elements we’ll be using to retrieve information from the Haystack pipeline:

# load variable defined in .env into the environment
load_dotenv()
# define the model that we'll use to create the embeddings for our question
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
# define the prompt that we're going to send to OpenAI
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
# initialize the document store
document_store = AstraDocumentStore(
embedding_dimension=384,
collection_name="starwars"
)
# initialize the Haystack pipeline
rag_pipeline = Pipeline()

If you’re unfamiliar with OpenAI or prompt engineering, pay special attention to how we’ve defined the prompt_template. We can break down the prompt into three parts:

  1. The instructions to the LLM — “Given these documents, answer the question”
  2. The text contents of the documents
  3. The question being asked

Like before, let’s add these elements to the pipeline and run the pipeline!

# add the components to the pipeline
rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder")
rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
# connect the components in the order they should be executed
rag_pipeline.connect("embedder", "retriever")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")
# Run the pipeline
question = "Who is Luke's sister?"
result = rag_pipeline.run(
{
"embedder": {"text": question},
"retriever": {"top_k": 2},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
)
print(result["answer_builder"]["answers"][0].data)

Open up a terminal and run the script:

python retrieve_data.py

If our RAG application worked, you’ll see something like the following output:

Leia

Woo hoo! We were able to use the force of Haystack to build a simple RAG application that uses Astra DB and vector search to answer questions about Star Wars.

Wrapping things up

Let’s review what we’ve learned in this blog post:

  • Using Haystack pipelines and AstraDocumentStore to store documents in Astra DB
  • Converting a question into an embedding and using AstraEmbeddingRetriever to find similar documents
  • Building a simple RAG app based on those similar documents and the question provided

You can find these code examples on Github and run them in a Colab notebook. I hope you enjoyed this post, feel free to find me on Twitter if you have any questions!

--

--

DataStax
DataStax

Written by DataStax

DataStax provides the real-time vector data tools that generative AI apps need, with seamless integration with developers' stacks of choice.