主页 > 其他  > 

项目准备(flask+pyhon+MachineLearning)-3

项目准备(flask+pyhon+MachineLearning)-3

目录

1.商品信息

2. 商品销售预测

2.1 机器学习

2.2 预测功能

3. 模型评估


1.商品信息 @app.route('/products') def products(): """商品分析页面""" data = load_data() # 计算当前期间和上期间 current_period = data[data['成交时间'] >= data['成交时间'].max() - timedelta(days=30)] previous_period = data[(data['成交时间'] < data['成交时间'].max() - timedelta(days=30)) & (data['成交时间'] >= data['成交时间'].max() - timedelta(days=60))] # 计算商品指标 current_sales = current_period.groupby('商品ID').apply( lambda x: (x['销量'] * x['单价']).sum() ) previous_sales = previous_period.groupby('商品ID').apply( lambda x: (x['销量'] * x['单价']).sum() ) product_metrics = pd.DataFrame({ 'current_sales': current_sales, 'previous_sales': previous_sales }).fillna(0) product_metrics['growth_rate'] = ( (product_metrics['current_sales'] - product_metrics['previous_sales']) / product_metrics['previous_sales'] ).fillna(0) max_competitor_sales = product_metrics['current_sales'].max() product_metrics['market_share'] = ( product_metrics['current_sales'] / max_competitor_sales ) # BCG矩阵分类 growth_rate_threshold = product_metrics['growth_rate'].median() market_share_threshold = product_metrics['market_share'].median() def classify_product(row): if row['growth_rate'] >= growth_rate_threshold: return '明星商品' if row['market_share'] >= market_share_threshold else '问题商品' else: return '现金牛' if row['market_share'] >= market_share_threshold else '瘦狗' product_metrics['category'] = product_metrics.apply(classify_product, axis=1) # 统计分类结果 category_stats = product_metrics.groupby('category').agg({ 'current_sales': ['count', 'sum'] }) category_stats.columns = ['product_count', 'sales_amount'] category_stats['sales_percentage'] = ( category_stats['sales_amount'] / category_stats['sales_amount'].sum() ) return render_template('products.html', category_statistics=category_stats.to_dict('index'), growth_rate_threshold=float(growth_rate_threshold), market_share_threshold=float(market_share_threshold)) 2. 商品销售预测 2.1 机器学习 def prepare_features(data): """准备特征数据""" # 删除包含NaN的行 data = data.dropna(subset=['销量', '单价', '类别ID', '门店编号']) # 时间特征 data['weekday'] = data['成交时间'].dt.weekday data['month'] = data['成交时间'].dt.month data['hour'] = data['成交时间'].dt.hour # 类别编码 le_category = LabelEncoder() le_store = LabelEncoder() # 拟合编码器 le_category.fit(data['类别ID'].astype(str)) # 将类别ID转换为字符串 le_store.fit(data['门店编号'].astype(str)) # 转换数据 data['类别编码'] = le_category.transform(data['类别ID'].astype(str)) data['门店编码'] = le_store.transform(data['门店编号'].astype(str)) # 特征选择 features = ['类别编码', '门店编码', '单价', 'weekday', 'month', 'hour'] target = '销量' # 确保所有特征都是数值类型 X = data[features].astype(float) y = data[target].astype(float) return X, y, le_category, le_store # 创建全局变量来存储模型和编码器 model = None scaler = None label_encoder_category = None label_encoder_store = None def initialize_model(): """初始化模型和编码器""" global model, scaler, label_encoder_category, label_encoder_store try: data = load_data() X, y, le_category, le_store = prepare_features(data) # 训练模型 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 标准化特征 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) # 训练决策树模型 model = DecisionTreeRegressor(random_state=42, max_depth=10) model.fit(X_train_scaled, y_train) # 保存编码器 label_encoder_category = le_category label_encoder_store = le_store return True except Exception as e: print(f"模型初始化错误: {str(e)}") return False 2.2 预测功能 @app.route('/predict', methods=['POST']) def predict(): """处理预测请求""" global model, scaler, label_encoder_category, label_encoder_store try: # 如果模型未初始化,先初始化 if model is None or scaler is None: if not initialize_model(): return jsonify({'error': '模型初始化失败'}), 500 # 获取表单数据 category = request.form['category'] store = request.form['store'] price = float(request.form['price']) weekday = int(request.form['weekday']) month = int(request.form['month']) try: # 转换类别编码和门店编码 category_encoded = label_encoder_category.transform([str(category)])[0] store_encoded = label_encoder_store.transform([str(store)])[0] except ValueError as e: return jsonify({'error': f'无效的输入数据: {str(e)}'}), 400 # 准备预测数据 pred_data = pd.DataFrame([[ category_encoded, store_encoded, price, weekday, month, 12 # 使用默认时间 ]], columns=['类别编码', '门店编码', '单价', 'weekday', 'month', 'hour']) # 标准化预测数据 pred_data_scaled = scaler.transform(pred_data) # 预测 prediction = model.predict(pred_data_scaled)[0] # 确保预测结果为正整数 prediction = max(0, round(prediction)) # 获取门店信息 store_info = STORE_INFO.get(store, {}) store_name = store_info.get('name', f'门店{store}') # 加载历史数据进行分析 data = load_data() # 计算该类别的历史平均销量 category_avg = data[data['类别ID'].astype(str) == str(category)]['销量'].mean() # 计算该门店的历史平均销量 store_avg = data[data['门店编号'].astype(str) == str(store)]['销量'].mean() # 计算价格区间的平均销量 price_range = 0.1 # 价格范围±10% price_lower = price * (1 - price_range) price_upper = price * (1 + price_range) price_avg = data[(data['单价'] >= price_lower) & (data['单价'] <= price_upper)]['销量'].mean() # 计算同时段(星期几和月份)的历史平均销量 time_avg = data[(data['成交时间'].dt.weekday == weekday) & (data['成交时间'].dt.month == month)]['销量'].mean() # 生成分析结果 analysis = { 'category_comparison': round((prediction / category_avg * 100) if category_avg > 0 else 100), 'store_comparison': round((prediction / store_avg * 100) if store_avg > 0 else 100), 'price_comparison': round((prediction / price_avg * 100) if price_avg > 0 else 100), 'time_comparison': round((prediction / time_avg * 100) if time_avg > 0 else 100), 'category_avg': round(category_avg if not pd.isna(category_avg) else 0), 'store_avg': round(store_avg if not pd.isna(store_avg) else 0), 'price_avg': round(price_avg if not pd.isna(price_avg) else 0), 'time_avg': round(time_avg if not pd.isna(time_avg) else 0) } return jsonify({ 'prediction': int(prediction), 'category': category, 'category_name': CATEGORY_NAMES.get(category, f'类别{category}'), 'store': store, 'store_name': store_name, 'price': price, 'weekday': weekday, 'month': month, 'analysis': analysis }) except Exception as e: print(f"预测错误: {str(e)}") return jsonify({'error': str(e)}), 400 @app.route('/prediction') def prediction_page(): """销售预测页面""" data = load_data() categories = sorted(data['类别ID'].astype(str).unique().tolist()) stores = sorted(data['门店编号'].astype(str).unique().tolist()) # 创建类别选项列表,包含ID和名称 category_options = [ {'id': cat_id, 'name': CATEGORY_NAMES.get(cat_id, f'类别{cat_id}')} for cat_id in categories ] # 创建门店选项列表 store_options = [ {'id': store_id, 'name': STORE_INFO.get(store_id, {}).get('name', f'门店{store_id}')} for store_id in stores ] # 初始化模型(如果需要) global model if model is None: initialize_model() return render_template('prediction.html', categories=category_options, stores=store_options) 3. 模型评估 @app.route('/model_evaluation') def model_evaluation(): """模型评估页面""" data = load_data() # 准备特征 X, y, le_category, le_store = prepare_features(data) # 训练模型并获取评估结果 _, _, metrics, feature_importance, scatter_data, residual_data, feature_names, importance_scores = train_models(X, y) return render_template('model_evaluation.html', metrics=metrics, feature_importance=feature_importance, scatter_data=scatter_data, residual_data=residual_data, feature_names=feature_names, importance_scores=importance_scores)

4. 训练模型

def train_models(X, y): """训练模型""" # 数据分割 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # 标准化特征 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # 训练决策树模型 dt_model = DecisionTreeRegressor(random_state=42, max_depth=10) dt_model.fit(X_train_scaled, y_train) # 预测 y_pred = dt_model.predict(X_test_scaled) # 计算模型指标 metrics = { 'r2_score': r2_score(y_test, y_pred), 'mse': mean_squared_error(y_test, y_pred), 'mae': mean_absolute_error(y_test, y_pred), 'rmse': np.sqrt(mean_squared_error(y_test, y_pred)) } # 特征重要性 feature_importance = [] for name, importance in zip(X.columns, dt_model.feature_importances_): correlation = np.corrcoef(X[name], y)[0, 1] feature_importance.append({ 'name': name, 'importance': importance, 'correlation': correlation }) # 准备图表数据 scatter_data = [[float(actual), float(pred)] for actual, pred in zip(y_test, y_pred)] residuals = y_test - y_pred residual_data = [[float(pred), float(residual)] for pred, residual in zip(y_pred, residuals)] return dt_model, scaler, metrics, feature_importance, scatter_data, residual_data, X.columns.tolist(), dt_model.feature_importances_.tolist()

标签:

项目准备(flask+pyhon+MachineLearning)-3由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“项目准备(flask+pyhon+MachineLearning)-3