用Python和sklearn搞定百度慧眼数据:从抓包到坐标转换的完整实战

张开发
2026/4/16 2:52:37 15 分钟阅读

分享文章

用Python和sklearn搞定百度慧眼数据:从抓包到坐标转换的完整实战
Python实战百度慧眼数据爬取与坐标转换全流程解析当我们需要分析城市人流分布时百度慧眼提供的热力图数据是个不错的选择。但直接从API获取的数据往往需要经过一系列处理才能用于分析。本文将带你完整走通从数据获取到坐标转换的整个流程使用Python和sklearn构建一个健壮的数据处理管道。1. 准备工作与环境搭建在开始之前确保你已经安装了以下Python库pip install requests pandas scikit-learn这些库将分别用于requests发送HTTP请求获取数据pandas数据处理和分析scikit-learn机器学习模型用于坐标转换此外建议使用Jupyter Notebook进行交互式开发方便调试和可视化中间结果。2. 数据获取从抓包到API请求2.1 分析API接口通过浏览器开发者工具F12分析百度慧眼的热力图请求我们发现关键API端点https://huiyan.baidu.com/openapi/v1/heatmap/heatmapsearch这个接口需要两个关键参数cityId城市代码如深圳为440300ak百度开发者密钥2.2 构建Python请求使用requests库构建请求import requests headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64), Referer: http://huiyan.baidu.com/ } params { cityId: 440300, # 深圳城市代码 ak: 你的百度API密钥 # 替换为实际密钥 } response requests.get( https://huiyan.baidu.com/openapi/v1/heatmap/heatmapsearch, headersheaders, paramsparams ) if response.status_code 200: data response.json() else: print(f请求失败状态码{response.status_code})2.3 数据解析获取的数据格式通常如下12679967_2573996_15|12667228_2576368_8|...我们需要将其转换为结构化的DataFrameimport pandas as pd def parse_heatmap_data(json_data): raw_str json_data[result][data] records [x.split(_) for x in raw_str.split(|) if x] df pd.DataFrame(records, columns[x, y, value]) df df.apply(pd.to_numeric, errorscoerce).dropna() return df heatmap_df parse_heatmap_data(data)3. 坐标转换从墨卡托到经纬度3.1 理解坐标系统百度慧眼返回的是墨卡托坐标bd09mc而通常我们需要的是经纬度坐标bd09ll。虽然百度提供了官方转换API但我们可以通过机器学习方法建立近似映射。3.2 收集参考点首先需要收集一组已知的墨卡托坐标和对应经纬度的参考点经度(lng)纬度(lat)墨卡托X(x)墨卡托Y(y)113.90457922.656801126799672573996113.79025122.676882126672282576368............3.3 建立线性回归模型使用sklearn的LinearRegression建立映射关系from sklearn.linear_model import LinearRegression # 准备训练数据 reference_df pd.read_csv(reference_points.csv) # 上面表格保存为CSV X_train reference_df[[x, y]] y_train reference_df[[lng, lat]] # 训练模型 model LinearRegression() model.fit(X_train, y_train) # 评估模型 print(f模型R²分数{model.score(X_train, y_train):.6f})3.4 应用坐标转换将模型应用到原始数据def convert_coordinates(df, model): coordinates model.predict(df[[x, y]]) df[lng] coordinates[:, 0] df[lat] coordinates[:, 1] return df heatmap_df convert_coordinates(heatmap_df, model)4. 工程化实现构建完整数据处理管道4.1 模块化设计将上述步骤封装为可重用的函数class HeatmapProcessor: def __init__(self, api_key, city_code): self.api_key api_key self.city_code city_code self.model None def load_reference_points(self, filepath): 加载参考点并训练模型 ref_df pd.read_csv(filepath) self.model LinearRegression() self.model.fit(ref_df[[x, y]], ref_df[[lng, lat]]) def fetch_data(self): 获取原始热力图数据 # 实现请求逻辑... pass def process(self): 完整处理流程 raw_data self.fetch_data() df parse_heatmap_data(raw_data) if self.model: df convert_coordinates(df, self.model) return df4.2 错误处理与日志添加健壮的错误处理和日志记录import logging from datetime import datetime logging.basicConfig( filenameheatmap.log, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) class HeatmapProcessor: # ... 其他代码 ... def fetch_data(self): try: response requests.get(self.api_url, headersself.headers, paramsself.params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(fAPI请求失败: {str(e)}) raise4.3 定时任务集成使用schedule库实现定时获取import schedule import time def job(): try: processor HeatmapProcessor(API_KEY, CITY_CODE) processor.load_reference_points(ref_points.csv) df processor.process() df.to_csv(fdata/heatmap_{datetime.now().strftime(%Y%m%d_%H%M)}.csv) except Exception as e: logging.error(f任务执行失败: {str(e)}) # 每小时执行一次 schedule.every().hour.do(job) while True: schedule.run_pending() time.sleep(60)5. 数据可视化与应用5.1 使用Pyecharts可视化from pyecharts.charts import Geo from pyecharts import options as opts def visualize_heatmap(df): geo Geo() geo.add_schema(maptype深圳) # 添加数据点 for _, row in df.iterrows(): geo.add_coordinate( fpoint_{row.name}, row[lng], row[lat] ) geo.add( , [(fpoint_{row.name}, row[value])], type_scatter, symbol_size5 ) geo.set_global_opts( title_optsopts.TitleOpts(title深圳人流热力图), visualmap_optsopts.VisualMapOpts(max_df[value].max()) ) return geo heatmap_chart visualize_heatmap(heatmap_df) heatmap_chart.render(heatmap.html)5.2 数据分析应用示例计算各区域人流密度# 使用KMeans聚类识别热点区域 from sklearn.cluster import KMeans coordinates heatmap_df[[lng, lat]].values kmeans KMeans(n_clusters10) heatmap_df[cluster] kmeans.fit_predict(coordinates) # 计算每个聚类的人流总量 cluster_stats heatmap_df.groupby(cluster)[value].agg([sum, count]) print(人流热点区域统计) print(cluster_stats.sort_values(sum, ascendingFalse))6. 性能优化与扩展6.1 批量处理与并行化对于大规模数据可以使用多线程/进程from concurrent.futures import ThreadPoolExecutor def process_city(city_code): processor HeatmapProcessor(API_KEY, city_code) return processor.process() city_codes [440300, 310000] # 深圳、上海 with ThreadPoolExecutor() as executor: results list(executor.map(process_city, city_codes))6.2 模型优化尝试更复杂的回归模型提高坐标转换精度from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import cross_val_score # 使用随机森林回归 rf_model RandomForestRegressor(n_estimators100) scores cross_val_score(rf_model, X_train, y_train, cv5) print(f交叉验证R²: {scores.mean():.4f} (±{scores.std():.4f}))6.3 数据持久化使用SQLite存储历史数据import sqlite3 from contextlib import closing def save_to_db(df, db_pathheatmap.db): with closing(sqlite3.connect(db_path)) as conn: df.to_sql(heatmap_data, conn, if_existsappend, indexFalse)

更多文章