Update 11/21/24
- fixed example workflow
I needed a web crawler/scraper (by request of domain owner) and a way to store the generated embeddings. In the code below there is a load_embeddings
and save_embeddings
function, by default these are unused. when enabled, these will store/load a file in pickle format (it's better than CSV for numpy, but you can change the code, you can also save to a Retool DB with the url as 1 column and the embeddings json object as another column.)
change the above to the following to enable file storage if you're using the code outside of a workflow (probably in a server app for RPC calls):
# Option to load existing crawl data
# existing_crawls = False
# uncomment below to enable storing/loading of the embedding data
existing_crawls = [f for f in os.listdir("raw_data") if f.startswith(f"crawled_data_{domain}")]
required env variables
openai_scraper = 'YOUR_OPENAI_API_KEY'
additional env variable (required for demo workflow only)
TRUGARD_API_KEY='' # or you're api key, there's a free version if you're curious otherwise just leave it blank
Example Workflow:
Web Scraper.json (111.1 KB)
- if
debug = true
all webhook parameters are always forced to default values.- if
false
, any missing parameters use default values
- if
- shows using the output of 1 model as the input for another
- input classified as
social media
sends output from initial base_model response to a specialized ai model - input classified as
create code
sends output from initial base_model response to another specialized ai model - all other input returns the initial base_model response
- new tasks (like image generation, file creation and so on) are easily added to
classify_intent
andbranch_on_task
- lays out framework for adding other models for specific tasks with branch blocks
branch_on_base_model
,branch_on_social_media_model
,branch_on_code_model
PYTHON LIBRARIES:
PYTHON CODE BLOCK:
import os
import json
import re
import time
import requests
import tiktoken
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from collections import deque
from html.parser import HTMLParser
from urllib.parse import urlparse
from openai import OpenAI
from typing import List, Dict, Any, Optional, Tuple
from tqdm import tqdm
from scipy.spatial.distance import cosine
from datetime import datetime
def remove_newlines(serie: pd.Series) -> pd.Series:
"""Remove newlines and extra spaces from a pandas Series of strings."""
serie = serie.str.replace('\n', ' ')
serie = serie.str.replace('\\n', ' ')
serie = serie.str.replace(' ', ' ')
serie = serie.str.replace(' ', ' ')
return serie
class HyperlinkParser(HTMLParser):
"""HTML parser for extracting hyperlinks from web pages.
Attributes:
hyperlinks: List of extracted hyperlink URLs
"""
def __init__(self):
super().__init__()
self.hyperlinks: List[str] = []
def handle_starttag(self, tag: str, attrs: List[tuple]) -> None:
"""Extract href attributes from anchor tags."""
attrs_dict = dict(attrs)
if tag == "a" and "href" in attrs_dict:
self.hyperlinks.append(attrs_dict["href"])
class WebCrawler:
"""Web crawler for domain-specific content extraction and Q&A functionality.
Implements breadth-first crawling, content extraction, embedding generation,
and question-answering capabilities using OpenAI's API.
Attributes:
client: OpenAI API client
domain: Target domain to crawl
start_url: Starting URL for crawling
crawled_data: List of dictionaries containing crawled page data
tokenizer: Tokenizer for text processing
"""
HTTP_URL_PATTERN = re.compile(r'^https?://.+$')
def __init__(self, api_key: str, domain: str, start_url: str):
"""Initialize crawler with API key and target domain information.
Args:
api_key: OpenAI API key
domain: Domain to crawl
start_url: Starting URL for crawling
"""
self.client = OpenAI(api_key=api_key)
self.domain = domain
self.start_url = start_url
self.crawled_data: List[Dict[str, str]] = []
self.setup_directories()
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def print_ascii_histogram(self, data: pd.Series, bins: int = 10) -> None:
"""Generate and print ASCII histogram of numerical data.
Args:
data: Series of numerical values
bins: Number of histogram bins
"""
if len(data) == 0:
print("No data to create histogram")
return
min_val = data.min()
max_val = data.max()
bin_width = (max_val - min_val) / bins if min_val != max_val else 1
hist_data = []
for i in range(bins):
bin_start = min_val + i * bin_width
bin_end = bin_start + bin_width
count = len(data[(data >= bin_start) & (data < bin_end)])
hist_data.append((bin_start, bin_end, count))
max_count = max(count for _, _, count in hist_data)
width = 40
print("\nToken Count Distribution:")
print("-" * 60)
for bin_start, bin_end, count in hist_data:
bar_length = int((count / max_count) * width) if max_count > 0 else 0
bar = "█" * bar_length
print(f"{bin_start:6.0f}-{bin_end:6.0f} | {count:4d} | {bar}")
print("-" * 60)
def setup_directories(self) -> None:
"""Create required directories for storing crawled and processed data."""
os.makedirs(f"text/{self.domain}/", exist_ok=True)
os.makedirs("processed", exist_ok=True)
os.makedirs("raw_data", exist_ok=True)
def save_crawled_data(self) -> str:
"""Save raw crawled data to timestamped JSON file.
Returns:
Path to saved file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"raw_data/crawled_data_{self.domain}_{timestamp}.json"
with open(filename, 'w') as f:
json.dump({
'domain': self.domain,
'start_url': self.start_url,
'crawl_time': timestamp,
'data': self.crawled_data
}, f, indent=2)
print(f"\nRaw crawled data saved to: {filename}")
return filename
def load_crawled_data(self, filename: str) -> None:
"""Load previously crawled data from JSON file.
Args:
filename: Path to JSON file containing crawled data
"""
try:
with open(filename, 'r') as f:
data = json.load(f)
self.crawled_data = data['data']
self.domain = data['domain']
print(f"\nLoaded {len(self.crawled_data)} pages from {filename}")
except Exception as e:
print(f"Error loading crawled data: {str(e)}")
def get_crawl_stats(self) -> Dict[str, Any]:
"""Calculate statistics about the crawled data.
Returns:
Dictionary containing page count, text length stats, and URL depth distribution
"""
total_pages = len(self.crawled_data)
total_text_length = sum(len(item['text']) for item in self.crawled_data)
avg_text_length = total_text_length / total_pages if total_pages > 0 else 0
urls_by_depth = {}
for item in self.crawled_data:
depth = item['url'].count('/') - 3 # Subtract 3 for http://domain.com/
urls_by_depth[depth] = urls_by_depth.get(depth, 0) + 1
return {
'total_pages': total_pages,
'total_text_length': total_text_length,
'avg_text_length': avg_text_length,
'urls_by_depth': urls_by_depth
}
def get_hyperlinks(self, url: str) -> List[str]:
"""Extract all hyperlinks from a webpage.
Args:
url: URL to extract links from
Returns:
List of hyperlink URLs found on the page
"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
if not response.headers.get('Content-Type', '').startswith("text/html"):
return []
parser = HyperlinkParser()
parser.feed(response.text)
return parser.hyperlinks
except requests.exceptions.RequestException as e:
print(f"Error fetching {url}: {str(e)}")
return []
def clean_url(self, link: str) -> str:
"""Normalize and clean URL format.
Args:
link: URL to clean
Returns:
Cleaned and normalized URL
"""
if link.startswith("/"):
link = link[1:]
return f"https://{self.domain}/{link}"
def get_domain_hyperlinks(self, url: str) -> List[str]:
"""Get hyperlinks that belong to the same domain."""
clean_links = set()
for link in set(self.get_hyperlinks(url)):
if not link or link.startswith(("mailto:", "#", "tel:")):
continue
if self.HTTP_URL_PATTERN.match(link):
url_obj = urlparse(link)
if url_obj.netloc != self.domain:
continue
clean_link = link
else:
clean_link = self.clean_url(link)
clean_links.add(clean_link.rstrip('/'))
return list(clean_links)
def get_page_title(self, soup: BeautifulSoup) -> str:
"""Extract page title and ensure it's a string."""
if soup.title and soup.title.string:
return str(soup.title.string).strip()
return "No title"
def get_page_text(self, soup: BeautifulSoup) -> str:
"""Extract and clean page text content."""
try:
text_elements = soup.stripped_strings
text = ' '.join(str(element) for element in text_elements)
return ' '.join(text.split())
except Exception as e:
print(f"Error extracting text: {str(e)}")
return ""
def crawl(self) -> List[Dict[str, str]]:
"""Perform the web crawling operation."""
queue = deque([self.start_url])
seen = {self.start_url}
while queue:
current_url = queue.popleft()
print(f"Crawling: {current_url}")
try:
response = requests.get(current_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
text = self.get_page_text(soup)
if "You need to enable JavaScript to run this app." in text:
print(f"JavaScript required for {current_url} - skipping")
continue
self.crawled_data.append({
"url": current_url,
"text": text,
"title": self.get_page_title(soup)
})
for link in self.get_domain_hyperlinks(current_url):
if link not in seen:
queue.append(link)
seen.add(link)
except requests.exceptions.RequestException as e:
print(f"Error processing {current_url}: {str(e)}")
continue
return self.crawled_data
def process_crawled_data(self) -> Tuple[pd.DataFrame, str]:
"""
Process crawled data into a pandas DataFrame and save both raw and processed data.
Returns:
Tuple containing:
- DataFrame with processed data
- Path to the saved raw data file
"""
# Save raw data first
raw_data_path = self.save_crawled_data()
# Process into DataFrame
processed_data = []
for item in self.crawled_data:
fname = urlparse(item['url']).path
if fname.startswith('/'):
fname = fname[1:]
fname = fname.replace('-', ' ').replace('_', ' ').replace('#update', '')
text = f"{item['title']}. {item['text']}"
text = ' '.join(text.split())
processed_data.append({
'title': fname,
'text': text,
'url': item['url'],
'raw_title': item['title']
})
df = pd.DataFrame(processed_data)
df['text'] = df.raw_title + ". " + remove_newlines(df.text)
# Save processed data
os.makedirs('processed', exist_ok=True)
df.to_csv('processed/scraped.csv', index=False)
return df, raw_data_path
def split_into_many(self, text: str, max_tokens: int = 500) -> List[str]:
"""Split text into chunks of maximum token length."""
sentences = text.split('. ')
n_tokens = [len(self.tokenizer.encode(" " + sentence)) for sentence in sentences]
chunks = []
tokens_so_far = 0
chunk = []
for sentence, token in zip(sentences, n_tokens):
if tokens_so_far + token > max_tokens:
if chunk:
chunks.append(". ".join(chunk) + ".")
chunk = []
tokens_so_far = 0
if token > max_tokens:
continue
chunk.append(sentence)
tokens_so_far += token + 1
if chunk:
chunks.append(". ".join(chunk) + ".")
return chunks
def analyze_tokens(self, df: pd.DataFrame) -> pd.DataFrame:
"""Analyze token counts in the text data."""
df['n_tokens'] = df.text.apply(lambda x: len(self.tokenizer.encode(x)))
print("\nToken count statistics:")
print(df['n_tokens'].describe())
self.print_ascii_histogram(df['n_tokens'])
return df
def process_text_into_chunks(self, df: pd.DataFrame, max_tokens: int = 500) -> pd.DataFrame:
"""Process the DataFrame text into chunks."""
shortened = []
for _, row in df.iterrows():
if row['text'] is None:
continue
if row['n_tokens'] > max_tokens:
shortened.extend(self.split_into_many(row['text'], max_tokens))
else:
shortened.append(row['text'])
chunks_df = pd.DataFrame({
'text': shortened,
'n_tokens': [len(self.tokenizer.encode(text)) for text in shortened]
})
print(f"\nOriginal number of documents: {len(df)}")
print(f"Number of chunks created: {len(chunks_df)}")
print(f"\nToken count statistics for chunks:")
print(chunks_df['n_tokens'].describe())
# Save the chunked data
chunks_df.to_csv('processed/scraped_chunks.csv', index=False)
return chunks_df
def create_embeddings(self, df: pd.DataFrame,
batch_size: int = 100,
max_retries: int = 3,
retry_delay: int = 5,
model: str = "text-embedding-ada-002") -> pd.DataFrame:
"""
Create embeddings for the text data in batches with error handling and retries.
Args:
df: DataFrame containing the text data
batch_size: Number of texts to process in each batch
max_retries: Maximum number of retry attempts for failed requests
retry_delay: Delay in seconds between retries
model: The OpenAI embedding model to use
Returns:
DataFrame with embeddings added
"""
print("\nGenerating embeddings...")
def get_embedding(text: str, attempt: int = 1) -> List[float]:
"""Helper function to get embeddings with retry logic"""
try:
response = self.client.embeddings.create(
input=text,
model=model
)
return response.data[0].embedding
except Exception as e:
if attempt < max_retries:
print(f"Error getting embedding (attempt {attempt}): {str(e)}")
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
return get_embedding(text, attempt + 1)
else:
print(f"Failed to get embedding after {max_retries} attempts: {str(e)}")
return [0.0] * 1536 # Return zero vector for failed embeddings
# Initialize empty embeddings list
all_embeddings = []
# Process in batches with progress bar
for i in tqdm(range(0, len(df), batch_size), desc="Processing batches"):
batch = df.iloc[i:i+batch_size]
# Get embeddings for the batch
batch_embeddings = [get_embedding(text) for text in batch['text']]
all_embeddings.extend(batch_embeddings)
# Optional: Add delay between batches to avoid rate limits
time.sleep(0.1)
# Add embeddings to DataFrame
df['embeddings'] = all_embeddings
# Save embeddings
self.save_embeddings(df)
return df
def save_embeddings(self, df: pd.DataFrame) -> None:
"""
Save the DataFrame with embeddings in both CSV and pickle format.
The pickle format preserves the numpy arrays better than CSV.
"""
# Save as CSV (embeddings will be stored as string)
csv_path = 'processed/embeddings.csv'
df.to_csv(csv_path, index=False)
print(f"\nEmbeddings saved to {csv_path}")
# Save as pickle (preserves numpy arrays)
pickle_path = 'processed/embeddings.pkl'
df.to_pickle(pickle_path)
print(f"Embeddings saved to {pickle_path}")
def distances_from_embeddings(
self,
query_embedding: List[float],
embeddings: List[List[float]],
distance_metric: str = 'cosine'
) -> np.ndarray:
"""
Calculate distances between a query embedding and a list of embeddings.
Args:
query_embedding: The embedding of the query text
embeddings: List of embeddings to compare against
distance_metric: The distance metric to use (currently supports 'cosine')
Returns:
Array of distances
"""
distances = []
for embedding in embeddings:
if distance_metric == 'cosine':
distance = cosine(query_embedding, embedding)
else:
raise ValueError(f"Unsupported distance metric: {distance_metric}")
distances.append(distance)
return np.array(distances)
def create_context(
self,
input: str,
df: pd.DataFrame,
max_len: int = 1800,
model: str = "text-embedding-ada-002"
) -> str:
"""
Create a context for a input by finding the most similar context from the dataframe.
Args:
input: The input to create context for
df: DataFrame containing the text data and embeddings
max_len: Maximum length of the context in tokens
model: The embedding model to use
Returns:
String containing the context
"""
try:
# Get the embeddings for the input
response = self.client.embeddings.create(
input=input,
model=model
)
q_embeddings = response.data[0].embedding
# Get the distances from the embeddings
df['distances'] = self.distances_from_embeddings(
q_embeddings,
df['embeddings'].values.tolist()
)
returns = []
cur_len = 0
# Sort by distance and add the text to the context until the context is too long
for _, row in df.sort_values('distances', ascending=True).iterrows():
# Add the length of the text to the current length
cur_len += row['n_tokens'] + 4
# If the context is too long, break
if cur_len > max_len:
break
# Else add it to the text that is being returned
returns.append(row["text"])
# Return the context
return "\n\n###\n\n".join(returns)
except Exception as e:
print(f"Error creating context: {str(e)}")
return ""
def answer_input(
self,
df: pd.DataFrame,
input: str,
model: str = "gpt-3.5-turbo",
max_len: int = 1800,
max_tokens: int = 150,
temperature: float = 0,
debug: bool = False,
stop_sequence: Optional[List[str]] = None
) -> str:
"""
Answer a input based on the most similar context from the dataframe texts.
Args:
df: DataFrame containing the text data and embeddings
input: The input to answer
model: The model to use for generating the answer
max_len: Maximum length of the context in tokens
max_tokens: Maximum tokens in the response
temperature: Temperature for response generation
debug: Whether to print debug information
stop_sequence: Optional list of stop sequences
Returns:
String containing the answer
"""
# Create context
context = self.create_context(
input,
df,
max_len=max_len,
)
# If debug, print the context
if debug:
print("Context:\n" + context)
print("\n\n")
if not context:
return "Failed to create context for the input"
try:
# Create a chat completion using the input and context
response = self.client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "Always refer to the context below to form a response to all input, and if the input has no response based on the given context, say \"I don't know\"\n\n"
},
{
"role": "user",
"content": f"Context: {context}\n\n---\n\nInput: {input}\nResponse:"
}
],
temperature=temperature,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
stop=stop_sequence,
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Error generating answer: {str(e)}")
return ""
def send_input(self, df: pd.DataFrame) -> str:
"""
send input with the crawled content.
Args:
df: DataFrame containing the text data and embeddings
"""
input = normalize_input.data.input
if input.lower() == 'exit':
return "none"
if not input:
return "none"
print("\nGenerating answer...")
answer = self.answer_input(
df,
input=input,
debug=True
)
print(f"\nAnswer: {answer}")
return answer
def main():
"""Main function to run the web crawler and QA system."""
api_key = retoolContext.configVars.openai_scraper
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
domain = normalize_input.data.url
start_url = domain
if domain.startswith('apidocs.trugard.ai'):
start_url = "https://" + start_url + "/home"
elif not domain.startswith(('https')):
start_url = "https://" + start_url
crawler = WebCrawler(api_key, domain, start_url)
# Option to load existing crawl data
existing_crawls = False
# uncomment below to enable storing/loading of the embedding data
# existing_crawls = [f for f in os.listdir("raw_data") if f.startswith(f"crawled_data_{domain}")]
if existing_crawls:
print("\nFound existing crawl data:")
for i, file in enumerate(sorted(existing_crawls)):
print(f"{i+1}. {file}")
choice = input("\nEnter number to load existing crawl, or press Enter to crawl again: ")
if choice.strip():
try:
crawler.load_crawled_data(f"raw_data/{existing_crawls[int(choice)-1]}")
except (ValueError, IndexError):
print("Invalid choice, proceeding with new crawl")
crawler.crawl()
else:
crawler.crawl()
else:
# Perform new crawl
crawler.crawl()
# Print crawl statistics
stats = crawler.get_crawl_stats()
print("\nCrawl Statistics:")
print(f"Total pages crawled: {stats['total_pages']}")
print(f"Average text length: {stats['avg_text_length']:.2f} characters")
print("\nPages by URL depth:")
for depth, count in sorted(stats['urls_by_depth'].items()):
print(f"Depth {depth}: {count} pages")
# Process data
df, raw_data_path = crawler.process_crawled_data()
print(f"\nRaw data saved to: {raw_data_path}")
# Analyze tokens and create chunks
df = crawler.analyze_tokens(df)
chunks_df = crawler.process_text_into_chunks(df)
# Generate embeddings
chunks_df_with_embeddings = crawler.create_embeddings(chunks_df)
# Send AI Input with embeddings
return crawler.send_input(chunks_df_with_embeddings)
return main()