主页 > 软件开发  > 

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(数据分析篇)

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(数据分析篇)
7.3. 数据监控与生成本地知识库 目的:监控新生成的小红书文案,记录每一次生成的小红书文案风格。后续根据输入topic,检索与某一topic有关的文案,可以根据先前的文案风格,生成类似风格的文案。实现思路: 1.要实现文件监控功能,需要使用watchdog库。watchdog是一个Python库,用于监控文件系统的变化。它提供了多种事件类型,如文件创建、修改、删除等,可以用来监控文件的变化。启动一个线程,实时监控xiaohongshu_drafts目录下的文件变化,当有新文件生成时,调用process_new_file(file_path)函数生成知识库。2.process_new_file(file_path)函数读取新文件中的内容,并调用generate_knowledge_base()函数对新生成的文案进行文本分割、对象转换、向量化等一系列操作来生成知识库。 代码实现: ''' Author: yeffky Date: 2025-02-12 13:29:31 LastEditTime: 2025-02-17 14:28:11 ''' from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from langchain_community.document_loaders import TextLoader from langchain_community.vectorstores import FAISS from langchain.embeddings.huggingface import HuggingFaceEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from transformers import AutoTokenizer from langchain.schema import Document from text2vec import SentenceModel import time import os class NewFileHandler(FileSystemEventHandler): def on_created(self, event): if not event.is_directory: file_path = event.src_path process_new_file(file_path) # 处理新文件 print(f"新增文件已加载: {file_path}") def process_new_file(file_path): # 加载文档(假设为txt格式) loader = TextLoader(file_path, encoding="utf-8") documents = loader.load() print(type(documents[0])) # 从 Hugging Face 加载一个标记器 tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese") text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer( tokenizer, chunk_size=256, chunk_overlap=0, separators=['---'] ) # 进行文本分块 text_chunks = text_splitter.split_text(documents[0].page_content) # 将分块结果转换为文档对象列表 chunk_docs = [Document(page_content=chunk) for chunk in text_chunks] # 向量化并更新索引 embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh") vector_store_dir = "./vector_store" if os.path.exists(vector_store_dir): vector_store = FAISS.load_local(vector_store_dir, embeddings, allow_dangerous_deserialization=True) vector_store.add_documents(chunk_docs) else: vector_store = FAISS.from_documents(chunk_docs, embeddings) vector_store.save_local(vector_store_dir) def start_observer(): observer = Observer() observer.schedule(NewFileHandler(), path="./xiaohongshu_drafts", recursive=False) observer.start() try: while True: # 每隔一小时检查一次 time.sleep(1) except KeyboardInterrupt: # 当用户按下 Ctrl+C 时,停止观察者 observer.stop() # 等待观察者线程结束 observer.join() if __name__ == "__main__": start_observer() 7.4. 数据分析 目的:根据先前爬取数据,与本地知识库联动,调用deepseek api完成小红书文案生成。实现思路: 要通过deepseek生成文案首先需要构建prompt和提问词,首先从文件中加载prompt和提问词模板,然后将前期爬取的商品数据以及本地知识库中的文案数据作为输入,构建prompt和提问词。 代码实现: ''' Author: yeffky Date: 2025-02-11 11:17:04 LastEditTime: 2025-02-17 15:35:13 ''' import json import os import requests from datetime import datetime import random from langchain import FAISS from langchain.embeddings.huggingface import HuggingFaceEmbeddings from text2vec import SentenceModel os.environ['HF_ENDPOINT'] = 'hf-mirror ' # 获取今天的日期 today_date = datetime.now().strftime('%Y-%m-%d') topic = "手机推荐" # 1. 读取JSON文件 def read_json_file(filename): with open(f'data/{filename}', 'r', encoding='utf-8') as f: return json.load(f) # 2. 构造分析提示词 def build_prompt(item): with open('./docs/prompt.txt', 'r', encoding='utf-8') as f: prompt = f.read() # 创建文本嵌入对象 embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh") # 检索知识库(扩大检索范围) vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True) retrieved_docs = vector_store.similarity_search(topic, k=5) # 检索Top 5 # 随机选择3个不同风格的参考文案 random.shuffle(retrieved_docs) selected_docs = retrieved_docs[:3] return f"""{prompt}, {json.dumps(item, ensure_ascii=False, indent=2)} **根据以下文案风格,做出创新**: {selected_docs} **注意**: - 在结尾加入提示,数据截至当前日期:{today_date} - 每一段内容使用 --- 进行分割 """ def build_preset(): with open('./docs/preset.txt', 'r', encoding='utf-8') as f: preset = f.read() # 创建文本嵌入对象 embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh") print("embeddings加载完毕") # 检索知识库(扩大检索范围) vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True) retrieved_docs = vector_store.similarity_search(topic, k=5) # 检索Top 5 # 随机选择3个不同风格的参考文案 random.shuffle(retrieved_docs) selected_docs = retrieved_docs[:3] preset += f"""\n **主题**:{topic} **创新要求**: - 使用{random.choice(["轻松幽默", "专业严谨", "犀利吐槽"])}的语气 - 加入{["emoji表情", "热门梗", "互动提问"]}元素 """ print(preset) return preset # 3. 调用Deepseek API def get_deepseek_response(preset, prompt, api_key): url = " api.deepseek /chat/completions" headers = { "Authorization": f"Bearer {api_key}", 'Content-Type': 'application/json', 'Accept': 'application/json', } payload = json.dumps({ "messages": [ { "content": preset, "role": "system" }, { "content": prompt, "role": "user" } ], "model": "deepseek-reasoner", "frequency_penalty": 0, "max_tokens": 2048, "presence_penalty": 0, "response_format": { "type": "text" }, "stop": None, "stream": False, "stream_options": None, "temperature": 1, "top_p": 1, "tools": None, "tool_choice": "none", "logprobs": False, "top_logprobs": None }) response = None while not response: try: response = requests.post(url, data=payload, headers=headers, timeout=100) response.raise_for_status() if not response.json(): response = None print("没有收到响应,重试中...") else: print("收到响应,内容为:\n" + response.json()['choices'][0]['message']['content']) except requests.exceptions.RequestException as e: print(f"请求失败:{str(e)}") response = None return response.json()['choices'][0]['message']['content'] # 4. 保存文案文件 def save_copywriting(content): base_path = f'./xiaohongshu_drafts/' filename = f"小红书_推广文案_千战系列" + today_date + ".txt" print(content) with open(base_path + filename, 'w', encoding='utf-8') as f: f.write(content) print(f"文案已保存至:{filename}") # 主流程 def analysis_data(): # 配置参数 API_KEY = os.getenv("DEEPSEEK_API_KEY") # 从环境变量获取API密钥 # 在文件名后加上今天的日期 JSON_FILE = f'goods_{today_date}.json' # 读取数据 items = read_json_file(JSON_FILE) print(f"正在处理:{JSON_FILE}") # 构造提示词 prompt = build_prompt(items) preset = build_preset() # 获取AI响应 try: response = get_deepseek_response(preset, prompt, API_KEY) save_copywriting(response) except Exception as e: print(f"处理失败:{str(e)}") if __name__ == "__main__": analysis_data()
标签:

使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(数据分析篇)由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(数据分析篇)