-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindexing_pipeline.py
73 lines (59 loc) · 2.95 KB
/
indexing_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import os
import zipfile
from io import BytesIO
from pathlib import Path
import requests
from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.utils import Secret
from couchbase_haystack import CouchbasePasswordAuthenticator, CouchbaseSearchDocumentStore
logger = logging.getLogger(__name__)
def fetch_archive_from_http(url: str, output_dir: str):
if Path(output_dir).is_dir():
logger.warn(f"'{output_dir}' directory already exists. Skipping data download")
return
with requests.get(url, timeout=10, stream=True) as response:
with zipfile.ZipFile(BytesIO(response.content)) as zip_ref:
zip_ref.extractall(output_dir)
# Let's first get some files that we want to use
docs_dir = "data/docs"
fetch_archive_from_http(
url="https://s3.eu-central-1.amazonaws.com/deepset.ai-farm-qa/datasets/documents/wiki_gameofthrones_txt6.zip",
output_dir=docs_dir,
)
# Make sure you have a running couchbase database, e.g. with Docker:
# docker run \
# --restart always \
# --publish=8091-8096:8091-8096 --publish=11210:11210 \
# --env COUCHBASE_ADMINISTRATOR_USERNAME=admin \
# --env COUCHBASE_ADMINISTRATOR_PASSWORD=passw0rd \
# couchbase:enterprise-7.6.2
document_store = CouchbaseSearchDocumentStore(
cluster_connection_string=Secret.from_token("localhost"),
authenticator=CouchbasePasswordAuthenticator(username=Secret.from_token("username"), password=Secret.from_token("password")),
bucket="haystack_bucket_name",
scope="haystack_scope_name",
collection="haystack_collection_name",
vector_search_index="vector_search_index",
)
# Create components and an indexing pipeline that converts txt to documents, cleans and splits them, and
# indexes them for dense retrieval.
p = Pipeline()
p.add_component("text_file_converter", TextFileToDocument())
p.add_component("cleaner", DocumentCleaner())
p.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=250, split_overlap=30))
p.add_component("embedder", SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"))
p.add_component("writer", DocumentWriter(document_store=document_store))
p.connect("text_file_converter.documents", "cleaner.documents")
p.connect("cleaner.documents", "splitter.documents")
p.connect("splitter.documents", "embedder.documents")
p.connect("embedder.documents", "writer.documents")
# Take the docs data directory as input and run the pipeline
file_paths = [docs_dir / Path(name) for name in os.listdir(docs_dir)]
result = p.run({"text_file_converter": {"sources": file_paths}})
# Assuming you have a Docker container running, navigate to <http://localhost:8091>
# to open the Couchbase Web Console and explore your data.