Python海量数据处理之_单机优化
|Word count:1.2k|Reading time:4min|Post View:
Python 海量数据处理之 _
单机优化
1. 说明
数据处理时,可能会遇到数千万以及上亿条数据的情况。一次处理所有数据,会遇到内存不够,计算时间太长等问题。一般的解法是:先拆分,再处理,最后将处理的结果合并(当然数据少的时候不需要这么麻烦)。本文将介绍在单机上,只使用
Python 如何处理大量数据。
2. 实例
本例是天池大数据竞赛中的“淘宝穿衣搭配”比赛,这是一个新人赛,只要注册参赛,即可下载数据。目标是根据商品信息,专家推荐,用户购物信息,计算出最佳商品组合。
本例中处理的是用户购物信息“表 1”:每条记录包含用户号 uid,商品号
mid,购物时间 time。
1 2 3 4
| uid,mid,time 4371603,8,20150418 8034236,8,20150516 6135829,8,20150405
|
需要统计每个用户都购买了什么物品,即生成“表 2”:记录包含用户号
uid,商品组合 mids。
1 2 3
| uid,mids 15 "1795974,1852545,98106,654166" 20 "2639977,79267"
|
赛题提供了千万级的购物数据,其中含有百万级的用户,全部 load
到内存再计算生成新的结构,虽然能运行,但内存占用让机器变得非常慢,普通计算只用到单
CPU,我的机器用 10 个小时才处理了 200
多万条数据,优化之后半小时以内处理完所有数据。下面看看具体实现。
3. 切分数据
(1) 目标
把数据切分成十份,分别存入文件
(2) 代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| user = pd.read_csv("../../data/user_bought_history.txt", sep=" ") user.columns = ['uid','mid','time']
dur = len(user)/10 ifrom = 0 idx = 0 while ifrom < len(user): ito = ifrom + dur data = user[ifrom:ito] print("from ", ifrom, "to ", (ito-1), "total", len(data)) data.to_csv('../../data/user_bought_' + str(idx) + '.csv', index=False) ifrom = ito idx += 1
|
4. 处理数据
(1) 目标
用多线程方式处理切分后的数据,将表 1 转换成表 2 格式
(1) 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| def do_conv(index): path = "../../data/user_bought_" + str(index) + ".csv" if not os.path.exists(path): return user = pd.read_csv(path) user['mid']=user['mid'].astype(str) grp=user.groupby('uid') print(index, len(grp))
user_buy_count_data = pd.DataFrame(columns=['uid','mids']) idx=0 arr_uid=[] arr_mid=[]
for name, group in grp: mids = ",".join(group['mid']) arr_uid.append(name) arr_mid.append(mids) if idx % 10000 == 0: show_info.show_time(str(index) + " : " + str(len(arr_uid))) idx+=1
user_buy_count_data['uid']=arr_uid user_buy_count_data['mids']=arr_mid
user_buy_count_data.to_csv("../../data/user_" + str(index) + ".csv", index=False)
if __name__ == '__main__': param_list = range(0, 11) pool = threadpool.ThreadPool(3) requests = threadpool.makeRequests(do_conv, param_list) [pool.putRequest(req) for req in requests] pool.wait()
|
(3) 技术点
- 统一处理数据格式
从文件中读出的数据默认为 int 型,用 astype 函数将整个数据表的 mid
字段变为 str 型,相对于每次处理时再转换更节约时间。
- 使用 groupby
groupby 函数将数据按不同的 uid 划分为成多个表格,groupby
还带有多种统计功能,相对于用字典方式统计数据效率高得多。
- 多线程
现在的机器都是多核的,能明显提高计算速度。python
中提供了几种不同的多线程方式,这时使用了线程池,它可以控制线程的数量,以免本例中太多线程占用大量内存让机器变慢。使用之前需要安装
threadpool 库。
1
| sudo pip install threadpool
|
5. 合并数据
(1) 目标
将转换完的数据合并,当同一个 user 在两个表中同时出现时,将 mids
累加在一起。
(2) 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| def do_add(x): if x.m == 'nan': return x.mids if x.mids == 'nan': return x.m return str(x.mids) + "," + str(x.m) def do_merge(data, path): if not os.path.exists(path): return data data.columns = ['uid','m'] ex = pd.read_csv(path) print(len(data),len(ex)) data = pd.merge(data, ex, how='outer') data['m']=data['m'].astype(str) data['mids']=data['mids'].astype(str) data['mids']=data.apply(do_add, axis=1) data = data.drop('m',axis=1) print(data.head()) return data
data = pd.DataFrame(columns=['uid','mids']) for index in range(0, 11): data = do_merge(data, "../../data/user_" + str(index) + ".csv") show_info.show_time("") print("after merge ", index, "len", len(data)) data.to_csv('../../data/user_all.csv',index=False)
|
6. 相关工具
- top 命令
top 是 linux
系统中统计系统资源占用的工具,默认为每秒统计一次,打开后按 1
键,可看到多核的占用情况。
7. 总结
在特征工程和算法的计算过程中,都可以使用先拆分再组合的方式,但前提是切分数据不会造成数据意义的变化。本文介绍了单机处理大数据的优化方式,下篇将介绍用
Hadoop 集群方案处理海量数据。