Retrieval-Augmented Generation (RAG)
This cookbook shows you how to create a Retrieval-Augmented Generation (RAG) pipeline, using PostgreSQL and PGVector.
Follow the cookbook to:
-
Create two Redpanda Connect data pipelines: one for creating vector embeddings with Ollama and another for searching the data, using PGVector.
-
Run the two pipelines in parallel using streams mode.
Compute the embeddings
Start by defining the indexing pipeline, which takes textual data from Redpanda, computes vector embeddings for it, and then write it into a PostgreSQL table with a PGVector index on the embeddings column.
rpk topic create articles
echo '{
"type": "article",
"article": {
"id": "123foo",
"title": "Dogs Stop Barking",
"content": "The world was shocked this morning to find that all dogs have stopped barking."
}
}' | rpk topic produce articles -f '%v'
Your indexing pipeline can read from the Redpanda topic, using the kafka
input:
input:
kafka:
addresses: [ "${REDPANDA_CLUSTER}" ]
topics: [ articles ]
consumer_group: rp_connect_articles_group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: "${REDPANDA_USER}"
password: "${REDPANDA_PASSWORD}"
Use Nomic Embed to compute embeddings. Since each request only applies to a single document, you can scale this by making requests in parallel across document batches.
To send a mapped request and map the response back into the original document, use the branch
processor with a child ollama_embeddings
processor.
pipeline:
threads: -1
processors:
- branch:
request_map: 'root = "search_document: %s\n%s".format(this.article.title, this.article.content)'
processors:
- ollama_embeddings:
model: nomic-embed-text
result_map: 'root.article.embeddings = this'
With this pipeline, your processed documents should look something like this:
{
"type": "article",
"article": {
"id": "123foo",
"title": "Dogs Stop Barking",
"content": "The world was shocked this morning to find that all dogs have stopped barking.",
"embeddings": [0.754, 0.19283, 0.231, 0.834], # This vector will actually have 768 dimensions
}
}
Now, try sending this transformed data to PostgreSQL using the sql_insert
output. You can take advantage of the init_statement
functionality to set up pgvector
and a table to write the data to.
output:
sql_insert:
driver: postgres
dsn: "${PG_CONNECTION_STRING}"
init_statement: |
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS searchable_text (
id varchar(128) PRIMARY KEY,
title text NOT NULL,
body text NOT NULL,
embeddings vector(768) NOT NULL
);
CREATE INDEX IF NOT EXISTS text_hnsw_index
ON searchable_text
USING hnsw (embeddings vector_l2_ops);
table: searchable_text
columns: ["id", "title", "body", "embeddings"]
args_mapping: "[this.article.id, this.article.title, this.article.content, this.article.embeddings.vector()]"
Save this pipeline as indexing.yaml
and run it with rpk connect run ./indexing.yaml
to make sure your PostgreSQL table is populated with embeddings.
Generating search responses
To generate responses to questions with the dataset you’re creating embeddings for, use a HTTP server input to receive the questions. Since you’re running this pipeline in streams mode, the HTTP server uses endpoints prefixed with stream identifier, which in this case is the same as the filename: search
.
input:
http_server:
path: /
allowed_verbs: [ GET ]
sync_response:
headers:
Content-Type: application/json
The user query uses the query parameter q
: http://localhost:4195/search?q=question_here
. Since query parameters are exposed as metadata in the http_server
input, you can reference that in bloblang with @q
.
pipeline:
processors:
- label: compute_embeddings
ollama_embeddings:
model: nomic-embed-text
text: "search_query: %s".format(@q)
The payload is the embeddings vector. To fetch the top three most similar documents to your embeddings vector, you can write a PostgreSQL query in the sql_raw
processor.
- sql_raw:
driver: "postgres"
dsn: "${PG_CONNECTION_STRING}"
query: SELECT title, body FROM searchable_text ORDER BY embeddings <-> $1 LIMIT 3
args_mapping: "[ this.vector() ]"
With your looked up information, as well as your initial query, you can use the ollama_chat
processor to respond to the user’s question as text.
- label: generate_response
ollama_chat:
model: llama3.1
prompt: |
Your task is to respond to user queries using the provided information.
The user asked: ${! @q }
Context: ${!this.map_each(row -> "%s\n%s".format(row.title, row.body)).join("\n\n")}
Response:
Now that you’ve generated a response, you can send that back to the HTTP server as a response using sync_response
. Then, delete the message using a bloblang mapping so that nothing goes to the output.
- mapping: 'root.response = content().string()'
- sync_response: {}
- mapping: 'root = deleted()'
Both pipelines are ready. Try running both of them using streams mode: rpk connect streams indexing.yaml search.yaml
.
When some documents have been indexed, you can query the system using:
curl -G 'localhost:4195/search' --data-urlencode 'q=what is happening to the dogs?' | jq
The output should look something like:
{
"response": "Everyone in the world woke up today shocked as their beloved pooches were silent - unable to bark."
}