主页 > 手机  > 

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)
6.环境配置 python=3.9 langchain==0.1.13 langchain-community==0.0.29 torch==2.0.1 transformers>=4.38.2 timm>=0.9.16 accelerate sentencepiece attrdict einops # for gradio demo gradio==3.48.0 gradio-client==0.6.1 mdtex2html==1.3.0 pypinyin==0.50.0 tiktoken==0.5.2 tqdm==4.64.0 colorama==0.4.5 Pygments==2.12.0 markdown==3.4.1 SentencePiece==0.1.96 requests fake_useragent hashlib playwright

其中playwright安装后需要安装对应的浏览器驱动,使用

playwright install

进行安装。

7.模块实现 7.1. 代理池建立

目的:获取代理IP,用于爬取数据时使用,频繁使用本地的IP地址可能会导致IP被封禁。本文仅供展示,因此仅选取部分免费的代理IP爬取建立线程池,免费的代理IP大多数质量不高,因此需要定期更新代理池或使用付费代理。

实现思路:   - 1.读取json文件中的代理IP获取网址,因为网址的每一页包含多条代理IP,并且网址的每一页的URL格式相同,因此可以采用多线程的方式快速获取所有代理IP。将获取到的代理IP存入txt文件中,每日定时更新。   - 2.模拟人为请求,在爬取过程中模拟访问浏览器,而非单纯使用request库发起请求,避免被网站封禁。

代码实现:

from playwright.sync_api import sync_playwright import time import random from fake_useragent import UserAgent import json from concurrent.futures import ThreadPoolExecutor def fetch_ip(url):     """每个线程使用独立的浏览器实例"""     with sync_playwright() as p:         try:             # 初始化浏览器实例             browser = p.chromium.launch(                 headless=True,                 args=[                     '--disable-blink-features=AutomationControlled',                     '--no-sandbox',                     '--disable-setuid-sandbox'                 ]             )                         # 创建新的上下文             context = browser.new_context(                 user_agent=get_stealth_headers()['User-Agent'],                 viewport={'width': 1920, 'height': 1080},                 extra_http_headers=get_stealth_headers()             )                         # 反检测脚本             context.add_init_script("""                 Object.defineProperty(navigator, 'webdriver', {                     get: () => undefined                 })             """)             page = context.new_page()                         # 随机鼠标移动             page.mouse.move(                 random.randint(0, 1920),                 random.randint(0, 1080)             )                         # 访问页面             page.goto(url, timeout=60000)                         # 随机滚动             for _ in range(random.randint(1, 3)):                 page.mouse.wheel(0, random.randint(300, 800))                 time.sleep(0.5)                         # 等待表格加载             page.wait_for_selector('table tbody tr', timeout=15000)                         # 解析数据             rows = page.query_selector_all('table tbody tr')             ip_list = []             for row in rows:                 try:                     ip = row.query_selector('td:first-child').inner_text().strip()                     port = row.query_selector('td:nth-child(2)').inner_text().strip()                     ip_list.append(f'{ip}:{port}')                 except:                     continue                                 return ip_list                     except Exception as e:             print(f'请求 {url} 失败: {str(e)}')             return []         finally:             # 确保资源释放             if 'page' in locals():                 page.close()             if 'context' in locals():                 context.close()             if 'browser' in locals():                 browser.close() def get_stealth_headers():     ua = UserAgent()     config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))     return {         'User-Agent': ua.random,         'Accept-Language': 'en-US,en;q=0.9',         'Referer': config['referer']     } def main():     config = json.load(open('docs/ip_crawl_config.json', 'r', encoding='utf-8'))     base_url = config['base_url']     urls = [f'{base_url}/{page_num}/' for page_num in range(1, 20)]         with ThreadPoolExecutor(max_workers=4) as executor:         results = executor.map(fetch_ip, urls)                 with open('proxies.txt', 'w', encoding='utf-8') as f:             total = 0             for idx, ip_list in enumerate(results, 1):                 if ip_list:                     f.write('\n'.join(ip_list) + '\n')                     print(f'第 {idx} 页获取到 {len(ip_list)} 条代理')                     total += len(ip_list)             print(f'共获取 {total} 条代理') if __name__ == '__main__':     main() 7.2. 数据爬取

目的:获取所要生成文案的相关数据,本项目所需要的是商品市场分析的有关数据,因此需要爬取对应平台以及第三方网站。

实现思路:   - 1.分析网站数据爬取思路,若网站数据为静态加载,固定格式,可以直接通过python中的网页分析库进行爬取,例如beautifulsoup4等。   - 2.由于本项目所需爬取的网站结构相对复杂,因此使用了抓包的方式进行爬取。爬虫过程涉及js逆向分析,需要分析js代码,找到数据来源,通过python的requests库进行数据爬取。(感兴趣的朋友可以查看我先前发的文章,有详细讲解js逆向分析的过程)   - 3.爬取数据后,对数据进行清洗,去除无用数据,将数据存储到本地json文件,方便后续使用。

代码实现:

''' Author: yeffky Date: 2025-02-09 17:18:05 LastEditTime: 2025-02-15 15:33:48 ''' import requests import random import time import json from fake_useragent import UserAgent import hashlib import time from datetime import datetime # 定义md5加密函数 def md5_hash(s):     return hashlib.md5(s.encode('utf-8')).hexdigest() # Le函数,类似于JS中的Le函数 def getAuth(e):     # 第一次MD5,将输入转换为字符串并计算MD5     first_md5 = hashlib.md5(str(e).encode()).hexdigest()     # 拼接字符串并第二次计算MD5     second_md5 = hashlib.md5(combined.encode()).hexdigest()     return second_md5 def get_random_proxy():     # 尝试打开名为 'proxies.txt' 的文件     try:         with open('proxies.txt', 'r') as f:             # 读取文件内容并按行分割成列表             proxies = f.read().splitlines()             # 如果列表不为空,则随机选择一个代理并返回             if proxies:                 return random.choice(proxies)     # 如果文件不存在,则捕获 FileNotFoundError 异常并忽略     except FileNotFoundError:         pass     # 如果文件不存在或列表为空,则返回 None     return None def remove_invalid_proxy(proxy):     # 尝试执行以下代码块,如果发生异常则跳转到except块     try:         # 打开名为'proxies.txt'的文件,以读取模式('r')         with open('proxies.txt', 'r') as f:             proxies = f.read().splitlines()                 if proxy in proxies:         # 检查传入的proxy是否在proxies列表中             proxies.remove(proxy)                 with open('proxies.txt', 'w') as f:             f.write('\n'.join(proxies) + '\n')     except FileNotFoundError:         pass def make_request(url, payload, auth, timestamp):     # 创建一个UserAgent对象,用于生成随机的User-Agent字符串     ua = UserAgent()     headers = {'User-Agent': ua.random, 'Auth': str(auth), 'Timestamp': str(timestamp), "Content-Type":"application/json",}     proxy = get_random_proxy() # 获取一个随机的代理     proxies = {'http': proxy, 'https': proxy} if proxy else None         try:         response = requests.post(             url,             headers=headers,             timeout=10,             proxies=proxies,             json=payload,             # verify=False         )         response.raise_for_status()         print(response)         return response.json()     except requests.exceptions.RequestException as e:         print(f"Request failed: {e}")                 # If the request failed, remove the invalid proxy         if proxy:             remove_invalid_proxy(proxy)                 # Try a new proxy         return None def crawl_data():     # 加载爬虫配置文件     crawal_config = json.load(open('./docs/crawl_config.json', 'r'))     base_url = crawal_config['base_url']  # 获取基础URL     payload = crawal_config['payload']  # 获取请求负载         response_text = None     while not response_text:  # Retry until the request is successful         # 时间戳         t = int((time.time()) * 1000)  # 获取当前时间戳(毫秒)         auth = getAuth(t)         response_text = make_request(base_url, payload, auth, t)         if not response_text:             print("Retrying with a new proxy...")             time.sleep(random.randint(3, 5))  # Add a small delay before retrying     try:         data = response_text         items_list = data.get('data', {}).get('list', [])                 # 获取今天的日期         today_date = datetime.now().strftime('%Y-%m-%d')         # 在文件名后加上今天的日期         filename = f'goods_{today_date}.json'         cnt = 0         with open(f'data/{filename}', 'w', encoding='utf-8') as f:             f.write('{"items": [' + '\n')             for item in items_list:                 item['商品涨幅'] = item.pop('Priceincrease')                 item['商品成交量'] = item.pop('SaleCount')                 item['商品名称'] = item.pop('GoodsName')                 item['商品最低价'] = item.pop('price')                 item['商品图标'] = item.pop('iconUrl')                 cnt += 1                 item_str = json.dumps(item, ensure_ascii=False, indent=2)                 if cnt == len(items_list):                     f.write(item_str + '\n')                 else:                     f.write(item_str + ',\n\n')             f.write(']}' + '\n')         print(f"Successfully wrote {len(items_list)} items to goods.txt")     except json.JSONDecodeError:         print("Failed to decode JSON response") if __name__ == "__main__":     crawl_data()
标签:

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(爬虫模块篇)