OpenAI Web Crawler for Workflows or RPC (Python)

Update 11/21/24

  • fixed example workflow

I needed a web crawler/scraper (by request of domain owner) and a way to store the generated embeddings. In the code below there is a load_embeddings and save_embeddings function, by default these are unused. when enabled, these will store/load a file in pickle format (it's better than CSV for numpy, but you can change the code, you can also save to a Retool DB with the url as 1 column and the embeddings json object as another column.)


change the above to the following to enable file storage if you're using the code outside of a workflow (probably in a server app for RPC calls):

    # Option to load existing crawl data
    # existing_crawls = False 
    # uncomment below to enable storing/loading of the embedding data
    existing_crawls = [f for f in os.listdir("raw_data") if f.startswith(f"crawled_data_{domain}")]

required env variables

openai_scraper = 'YOUR_OPENAI_API_KEY'

additional env variable (required for demo workflow only)

TRUGARD_API_KEY='' # or you're api key, there's a free version if you're curious otherwise just leave it blank

Example Workflow:
Web Scraper.json (111.1 KB)

  • if debug = true all webhook parameters are always forced to default values.
    • if false, any missing parameters use default values
  • shows using the output of 1 model as the input for another
  • input classified as social media sends output from initial base_model response to a specialized ai model
  • input classified as create code sends output from initial base_model response to another specialized ai model
  • all other input returns the initial base_model response
  • new tasks (like image generation, file creation and so on) are easily added to classify_intent and branch_on_task
  • lays out framework for adding other models for specific tasks with branch blocks
    • branch_on_base_model, branch_on_social_media_model, branch_on_code_model

PYTHON LIBRARIES:
image

PYTHON CODE BLOCK:

import os
import json
import re
import time
import requests
import tiktoken
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from collections import deque
from html.parser import HTMLParser
from urllib.parse import urlparse
from openai import OpenAI
from typing import List, Dict, Any, Optional, Tuple
from tqdm import tqdm
from scipy.spatial.distance import cosine
from datetime import datetime

def remove_newlines(serie: pd.Series) -> pd.Series:
    """Remove newlines and extra spaces from a pandas Series of strings."""
    serie = serie.str.replace('\n', ' ')
    serie = serie.str.replace('\\n', ' ')
    serie = serie.str.replace('  ', ' ')
    serie = serie.str.replace('  ', ' ')
    return serie

class HyperlinkParser(HTMLParser):
    """HTML parser for extracting hyperlinks from web pages.
    
    Attributes:
        hyperlinks: List of extracted hyperlink URLs
    """
    def __init__(self):
        super().__init__()
        self.hyperlinks: List[str] = []

    def handle_starttag(self, tag: str, attrs: List[tuple]) -> None:
        """Extract href attributes from anchor tags."""
        attrs_dict = dict(attrs)
        if tag == "a" and "href" in attrs_dict:
            self.hyperlinks.append(attrs_dict["href"])

class WebCrawler:
    """Web crawler for domain-specific content extraction and Q&A functionality.
    
    Implements breadth-first crawling, content extraction, embedding generation,
    and question-answering capabilities using OpenAI's API.
    
    Attributes:
        client: OpenAI API client
        domain: Target domain to crawl
        start_url: Starting URL for crawling
        crawled_data: List of dictionaries containing crawled page data
        tokenizer: Tokenizer for text processing
    """
    
    HTTP_URL_PATTERN = re.compile(r'^https?://.+$')

    def __init__(self, api_key: str, domain: str, start_url: str):
        """Initialize crawler with API key and target domain information.
        
        Args:
            api_key: OpenAI API key
            domain: Domain to crawl
            start_url: Starting URL for crawling
        """
        self.client = OpenAI(api_key=api_key)
        self.domain = domain
        self.start_url = start_url
        self.crawled_data: List[Dict[str, str]] = []
        self.setup_directories()
        self.tokenizer = tiktoken.get_encoding("cl100k_base")

    def print_ascii_histogram(self, data: pd.Series, bins: int = 10) -> None:
        """Generate and print ASCII histogram of numerical data.
        
        Args:
            data: Series of numerical values
            bins: Number of histogram bins
        """
        if len(data) == 0:
            print("No data to create histogram")
            return

        min_val = data.min()
        max_val = data.max()
        bin_width = (max_val - min_val) / bins if min_val != max_val else 1
        
        hist_data = []
        for i in range(bins):
            bin_start = min_val + i * bin_width
            bin_end = bin_start + bin_width
            count = len(data[(data >= bin_start) & (data < bin_end)])
            hist_data.append((bin_start, bin_end, count))
        
        max_count = max(count for _, _, count in hist_data)
        width = 40
        
        print("\nToken Count Distribution:")
        print("-" * 60)
        
        for bin_start, bin_end, count in hist_data:
            bar_length = int((count / max_count) * width) if max_count > 0 else 0
            bar = "█" * bar_length
            print(f"{bin_start:6.0f}-{bin_end:6.0f} | {count:4d} | {bar}")
        
        print("-" * 60)

    def setup_directories(self) -> None:
        """Create required directories for storing crawled and processed data."""
        os.makedirs(f"text/{self.domain}/", exist_ok=True)
        os.makedirs("processed", exist_ok=True)
        os.makedirs("raw_data", exist_ok=True)

    def save_crawled_data(self) -> str:
        """Save raw crawled data to timestamped JSON file.
        
        Returns:
            Path to saved file
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"raw_data/crawled_data_{self.domain}_{timestamp}.json"
        
        with open(filename, 'w') as f:
            json.dump({
                'domain': self.domain,
                'start_url': self.start_url,
                'crawl_time': timestamp,
                'data': self.crawled_data
            }, f, indent=2)
            
        print(f"\nRaw crawled data saved to: {filename}")
        return filename

    def load_crawled_data(self, filename: str) -> None:
        """Load previously crawled data from JSON file.
        
        Args:
            filename: Path to JSON file containing crawled data
        """
        try:
            with open(filename, 'r') as f:
                data = json.load(f)
                self.crawled_data = data['data']
                self.domain = data['domain']
                print(f"\nLoaded {len(self.crawled_data)} pages from {filename}")
        except Exception as e:
            print(f"Error loading crawled data: {str(e)}")

    def get_crawl_stats(self) -> Dict[str, Any]:
        """Calculate statistics about the crawled data.
        
        Returns:
            Dictionary containing page count, text length stats, and URL depth distribution
        """
        total_pages = len(self.crawled_data)
        total_text_length = sum(len(item['text']) for item in self.crawled_data)
        avg_text_length = total_text_length / total_pages if total_pages > 0 else 0
        
        urls_by_depth = {}
        for item in self.crawled_data:
            depth = item['url'].count('/') - 3  # Subtract 3 for http://domain.com/
            urls_by_depth[depth] = urls_by_depth.get(depth, 0) + 1
        
        return {
            'total_pages': total_pages,
            'total_text_length': total_text_length,
            'avg_text_length': avg_text_length,
            'urls_by_depth': urls_by_depth
        }
  
    def get_hyperlinks(self, url: str) -> List[str]:
        """Extract all hyperlinks from a webpage.
        
        Args:
            url: URL to extract links from
            
        Returns:
            List of hyperlink URLs found on the page
        """
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            
            if not response.headers.get('Content-Type', '').startswith("text/html"):
                return []

            parser = HyperlinkParser()
            parser.feed(response.text)
            return parser.hyperlinks

        except requests.exceptions.RequestException as e:
            print(f"Error fetching {url}: {str(e)}")
            return []

    def clean_url(self, link: str) -> str:
        """Normalize and clean URL format.
        
        Args:
            link: URL to clean
            
        Returns:
            Cleaned and normalized URL
        """
        if link.startswith("/"):
            link = link[1:]
        return f"https://{self.domain}/{link}"

    def get_domain_hyperlinks(self, url: str) -> List[str]:
        """Get hyperlinks that belong to the same domain."""
        clean_links = set()
        
        for link in set(self.get_hyperlinks(url)):
            if not link or link.startswith(("mailto:", "#", "tel:")):
                continue

            if self.HTTP_URL_PATTERN.match(link):
                url_obj = urlparse(link)
                if url_obj.netloc != self.domain:
                    continue
                clean_link = link
            else:
                clean_link = self.clean_url(link)

            clean_links.add(clean_link.rstrip('/'))

        return list(clean_links)

    def get_page_title(self, soup: BeautifulSoup) -> str:
        """Extract page title and ensure it's a string."""
        if soup.title and soup.title.string:
            return str(soup.title.string).strip()
        return "No title"

    def get_page_text(self, soup: BeautifulSoup) -> str:
        """Extract and clean page text content."""
        try:
            text_elements = soup.stripped_strings
            text = ' '.join(str(element) for element in text_elements)
            return ' '.join(text.split())
        except Exception as e:
            print(f"Error extracting text: {str(e)}")
            return ""

    def crawl(self) -> List[Dict[str, str]]:
        """Perform the web crawling operation."""
        queue = deque([self.start_url])
        seen = {self.start_url}

        while queue:
            current_url = queue.popleft()
            print(f"Crawling: {current_url}")

            try:
                response = requests.get(current_url, timeout=10)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, "html.parser")
                
                text = self.get_page_text(soup)
                
                if "You need to enable JavaScript to run this app." in text:
                    print(f"JavaScript required for {current_url} - skipping")
                    continue

                self.crawled_data.append({
                    "url": current_url,
                    "text": text,
                    "title": self.get_page_title(soup)
                })

                for link in self.get_domain_hyperlinks(current_url):
                    if link not in seen:
                        queue.append(link)
                        seen.add(link)

            except requests.exceptions.RequestException as e:
                print(f"Error processing {current_url}: {str(e)}")
                continue

        return self.crawled_data

    def process_crawled_data(self) -> Tuple[pd.DataFrame, str]:
        """
        Process crawled data into a pandas DataFrame and save both raw and processed data.
        
        Returns:
            Tuple containing:
            - DataFrame with processed data
            - Path to the saved raw data file
        """
        # Save raw data first
        raw_data_path = self.save_crawled_data()
        
        # Process into DataFrame
        processed_data = []
        
        for item in self.crawled_data:
            fname = urlparse(item['url']).path
            if fname.startswith('/'):
                fname = fname[1:]
            
            fname = fname.replace('-', ' ').replace('_', ' ').replace('#update', '')
            
            text = f"{item['title']}. {item['text']}"
            text = ' '.join(text.split())
            
            processed_data.append({
                'title': fname,
                'text': text,
                'url': item['url'],
                'raw_title': item['title']
            })

        df = pd.DataFrame(processed_data)
        df['text'] = df.raw_title + ". " + remove_newlines(df.text)
        
        # Save processed data
        os.makedirs('processed', exist_ok=True)
        df.to_csv('processed/scraped.csv', index=False)
        
        return df, raw_data_path

    def split_into_many(self, text: str, max_tokens: int = 500) -> List[str]:
        """Split text into chunks of maximum token length."""
        sentences = text.split('. ')
        n_tokens = [len(self.tokenizer.encode(" " + sentence)) for sentence in sentences]
        
        chunks = []
        tokens_so_far = 0
        chunk = []

        for sentence, token in zip(sentences, n_tokens):
            if tokens_so_far + token > max_tokens:
                if chunk:
                    chunks.append(". ".join(chunk) + ".")
                chunk = []
                tokens_so_far = 0
            
            if token > max_tokens:
                continue
            
            chunk.append(sentence)
            tokens_so_far += token + 1
        
        if chunk:
            chunks.append(". ".join(chunk) + ".")
        
        return chunks

    def analyze_tokens(self, df: pd.DataFrame) -> pd.DataFrame:
        """Analyze token counts in the text data."""
        df['n_tokens'] = df.text.apply(lambda x: len(self.tokenizer.encode(x)))
        
        print("\nToken count statistics:")
        print(df['n_tokens'].describe())
        
        self.print_ascii_histogram(df['n_tokens'])
        
        return df

    def process_text_into_chunks(self, df: pd.DataFrame, max_tokens: int = 500) -> pd.DataFrame:
        """Process the DataFrame text into chunks."""
        shortened = []
        
        for _, row in df.iterrows():
            if row['text'] is None:
                continue
                
            if row['n_tokens'] > max_tokens:
                shortened.extend(self.split_into_many(row['text'], max_tokens))
            else:
                shortened.append(row['text'])
        
        chunks_df = pd.DataFrame({
            'text': shortened,
            'n_tokens': [len(self.tokenizer.encode(text)) for text in shortened]
        })
        
        print(f"\nOriginal number of documents: {len(df)}")
        print(f"Number of chunks created: {len(chunks_df)}")
        print(f"\nToken count statistics for chunks:")
        print(chunks_df['n_tokens'].describe())
        
        # Save the chunked data
        chunks_df.to_csv('processed/scraped_chunks.csv', index=False)
        
        return chunks_df

    def create_embeddings(self, df: pd.DataFrame, 
                         batch_size: int = 100, 
                         max_retries: int = 3, 
                         retry_delay: int = 5,
                         model: str = "text-embedding-ada-002") -> pd.DataFrame:
        """
        Create embeddings for the text data in batches with error handling and retries.
        
        Args:
            df: DataFrame containing the text data
            batch_size: Number of texts to process in each batch
            max_retries: Maximum number of retry attempts for failed requests
            retry_delay: Delay in seconds between retries
            model: The OpenAI embedding model to use
            
        Returns:
            DataFrame with embeddings added
        """
        print("\nGenerating embeddings...")
        
        def get_embedding(text: str, attempt: int = 1) -> List[float]:
            """Helper function to get embeddings with retry logic"""
            try:
                response = self.client.embeddings.create(
                    input=text,
                    model=model
                )
                return response.data[0].embedding
                
            except Exception as e:
                if attempt < max_retries:
                    print(f"Error getting embedding (attempt {attempt}): {str(e)}")
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
                    return get_embedding(text, attempt + 1)
                else:
                    print(f"Failed to get embedding after {max_retries} attempts: {str(e)}")
                    return [0.0] * 1536  # Return zero vector for failed embeddings
        
        # Initialize empty embeddings list
        all_embeddings = []
        
        # Process in batches with progress bar
        for i in tqdm(range(0, len(df), batch_size), desc="Processing batches"):
            batch = df.iloc[i:i+batch_size]
            
            # Get embeddings for the batch
            batch_embeddings = [get_embedding(text) for text in batch['text']]
            all_embeddings.extend(batch_embeddings)
            
            # Optional: Add delay between batches to avoid rate limits
            time.sleep(0.1)
        
        # Add embeddings to DataFrame
        df['embeddings'] = all_embeddings
        
        # Save embeddings
        self.save_embeddings(df)
        
        return df
    
    def save_embeddings(self, df: pd.DataFrame) -> None:
        """
        Save the DataFrame with embeddings in both CSV and pickle format.
        The pickle format preserves the numpy arrays better than CSV.
        """
        # Save as CSV (embeddings will be stored as string)
        csv_path = 'processed/embeddings.csv'
        df.to_csv(csv_path, index=False)
        print(f"\nEmbeddings saved to {csv_path}")
        
        # Save as pickle (preserves numpy arrays)
        pickle_path = 'processed/embeddings.pkl'
        df.to_pickle(pickle_path)
        print(f"Embeddings saved to {pickle_path}")

    def distances_from_embeddings(
        self,
        query_embedding: List[float],
        embeddings: List[List[float]],
        distance_metric: str = 'cosine'
    ) -> np.ndarray:
        """
        Calculate distances between a query embedding and a list of embeddings.
        
        Args:
            query_embedding: The embedding of the query text
            embeddings: List of embeddings to compare against
            distance_metric: The distance metric to use (currently supports 'cosine')
            
        Returns:
            Array of distances
        """
        distances = []
        for embedding in embeddings:
            if distance_metric == 'cosine':
                distance = cosine(query_embedding, embedding)
            else:
                raise ValueError(f"Unsupported distance metric: {distance_metric}")
            distances.append(distance)
        return np.array(distances)

    def create_context(
        self,
        input: str,
        df: pd.DataFrame,
        max_len: int = 1800,
        model: str = "text-embedding-ada-002"
    ) -> str:
        """
        Create a context for a input by finding the most similar context from the dataframe.
        
        Args:
            input: The input to create context for
            df: DataFrame containing the text data and embeddings
            max_len: Maximum length of the context in tokens
            model: The embedding model to use
            
        Returns:
            String containing the context
        """
        try:
            # Get the embeddings for the input
            response = self.client.embeddings.create(
                input=input,
                model=model
            )
            q_embeddings = response.data[0].embedding

            # Get the distances from the embeddings
            df['distances'] = self.distances_from_embeddings(
                q_embeddings,
                df['embeddings'].values.tolist()
            )

            returns = []
            cur_len = 0

            # Sort by distance and add the text to the context until the context is too long
            for _, row in df.sort_values('distances', ascending=True).iterrows():
                # Add the length of the text to the current length
                cur_len += row['n_tokens'] + 4

                # If the context is too long, break
                if cur_len > max_len:
                    break

                # Else add it to the text that is being returned
                returns.append(row["text"])

            # Return the context
            return "\n\n###\n\n".join(returns)

        except Exception as e:
            print(f"Error creating context: {str(e)}")
            return ""

    def answer_input(
        self,
        df: pd.DataFrame,
        input: str,
        model: str = "gpt-3.5-turbo",
        max_len: int = 1800,
        max_tokens: int = 150,
        temperature: float = 0,
        debug: bool = False,
        stop_sequence: Optional[List[str]] = None
    ) -> str:
        """
        Answer a input based on the most similar context from the dataframe texts.
        
        Args:
            df: DataFrame containing the text data and embeddings
            input: The input to answer
            model: The model to use for generating the answer
            max_len: Maximum length of the context in tokens
            max_tokens: Maximum tokens in the response
            temperature: Temperature for response generation
            debug: Whether to print debug information
            stop_sequence: Optional list of stop sequences
            
        Returns:
            String containing the answer
        """
        # Create context
        context = self.create_context(
            input,
            df,
            max_len=max_len,
        )

        # If debug, print the context
        if debug:
            print("Context:\n" + context)
            print("\n\n")

        if not context:
            return "Failed to create context for the input"

        try:
            # Create a chat completion using the input and context
            response = self.client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": "Always refer to the context below to form a response to all input, and if the input has no response based on the given context, say \"I don't know\"\n\n"
                    },
                    {
                        "role": "user",
                        "content": f"Context: {context}\n\n---\n\nInput: {input}\nResponse:"
                    }
                ],
                temperature=temperature,
                max_tokens=max_tokens,
                top_p=1,
                frequency_penalty=0,
                presence_penalty=0,
                stop=stop_sequence,
            )
            return response.choices[0].message.content.strip()

        except Exception as e:
            print(f"Error generating answer: {str(e)}")
            return ""

    def send_input(self, df: pd.DataFrame) -> str:
        """
        send input with the crawled content.
        
        Args:
            df: DataFrame containing the text data and embeddings
        """
        input = normalize_input.data.input
        
        if input.lower() == 'exit':
            return "none"
            
        if not input:
            return "none"
            
        print("\nGenerating answer...")
        answer = self.answer_input(
            df,
            input=input,
            debug=True
        )
        print(f"\nAnswer: {answer}")
        return answer

def main():
    """Main function to run the web crawler and QA system."""
    api_key = retoolContext.configVars.openai_scraper
    if not api_key:
        raise ValueError("OPENAI_API_KEY environment variable not set")
    
    domain = normalize_input.data.url
    start_url = domain
    if domain.startswith('apidocs.trugard.ai'):
      start_url = "https://" + start_url + "/home"  
    elif not domain.startswith(('https')):
      start_url = "https://" + start_url

    crawler = WebCrawler(api_key, domain, start_url)
    
    # Option to load existing crawl data
    existing_crawls = False 
    # uncomment below to enable storing/loading of the embedding data
    # existing_crawls = [f for f in os.listdir("raw_data") if f.startswith(f"crawled_data_{domain}")]
    if existing_crawls:
        print("\nFound existing crawl data:")
        for i, file in enumerate(sorted(existing_crawls)):
            print(f"{i+1}. {file}")
        choice = input("\nEnter number to load existing crawl, or press Enter to crawl again: ")
        
        if choice.strip():
            try:
                crawler.load_crawled_data(f"raw_data/{existing_crawls[int(choice)-1]}")
            except (ValueError, IndexError):
                print("Invalid choice, proceeding with new crawl")
                crawler.crawl()
        else:
            crawler.crawl()
    else:
        # Perform new crawl
        crawler.crawl()
    
    # Print crawl statistics
    stats = crawler.get_crawl_stats()
    print("\nCrawl Statistics:")
    print(f"Total pages crawled: {stats['total_pages']}")
    print(f"Average text length: {stats['avg_text_length']:.2f} characters")
    print("\nPages by URL depth:")
    for depth, count in sorted(stats['urls_by_depth'].items()):
        print(f"Depth {depth}: {count} pages")
    
    # Process data
    df, raw_data_path = crawler.process_crawled_data()
    print(f"\nRaw data saved to: {raw_data_path}")
    
    # Analyze tokens and create chunks
    df = crawler.analyze_tokens(df)
    chunks_df = crawler.process_text_into_chunks(df)
    
    # Generate embeddings
    chunks_df_with_embeddings = crawler.create_embeddings(chunks_df)
    
    # Send AI Input with embeddings
    return crawler.send_input(chunks_df_with_embeddings)

return main()

Original code by OpenAI

1 Like