Python利用多进程将大量数据放入有限内存的教程
简介
这是一篇有关如何将大量的数据放入有限的内存中的简略教程。
与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作。大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许。这种方法对时间、机器硬件和所处环境都有要求。
下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j、MongoDB或其他类型的数据库,仅仅使用csvs、tsvs等格式存储的表格),如果将所有表格组合在一起,得到的数据帧太大,无法放入内存。所以第一个想法是:将其拆分成不同的部分,逐个存储。这个方案看起来不错,但处理起来很慢。除非我们使用多核处理器。
目标
这里的目标是从所有职位中(大约1万个),找出相关的的职位。将这些职位与政府给的职位代码组合起来。接着将组合的结果与对应的州(行政单位)信息组合起来。然后用通过word2vec生成的属性信息在我们的客户的管道中增强已有的属性。
这个任务要求在短时间内完成,谁也不愿意等待。想象一下,这就像在不使用标准的关系型数据库的情况下进行多个表的连接。
数据
示例脚本
下面的是一个示例脚本,展示了如何使用multiprocessing来在有限的内存空间中加速操作过程。脚本的第一部分是和特定任务相关的,可以自由跳过。请着重关注第二部分,这里侧重的是multiprocessing引擎。
#import the necessary packages import pandas as pd import us import numpy as np from multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version # of a number where '12000' is '12,000' with that beautiful useless comma in there. # did I mention I excel bothers me? # instead of converting the number right away, we only convert them when we need to def median_maker(column): return np.median([int(x.replace(',','')) for x in column]) # dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist' # related_title_score_df is the dataframe of information for the title; columns = ['title','score'] ### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871 # code_title_df contains columns ['code','title'] # oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location): try: related_title_score_df = dictionary_of_dataframes[title] # we limit dataframe1 to only those related_titles that are above # a previously established threshold related_title_score_df = related_title_score_df[title_score_df['score']>80] #we merge the related titles with another table and its codes codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df) codes_relTitles_scores = codes_relTitles_scores.drop_duplicates() # merge the two dataframes by the codes merged_df = pd.merge(codes_relTitles_scores, oes_data_df) #limit the BLS data to the state we want all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)] #calculate some summary statistics for the time we want group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker) row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90] #convert it all to strings so we can combine them all when writing to file row_string = [str(x) for x in row] return row_string except: # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant 'do nothing'
这里发生了神奇的事情:
#runs the function and puts the answers in the queue def worker(row, q): ans = job_title_location_matcher(row[0],row[1]) q.put(ans) # this writes to the file while there are still things that could be in the queue # this allows for multiple processes to write to the same file without blocking eachother def listener(q): f = open(filename,'wb') while 1: m = q.get() if m =='kill': break f.write(','.join(m) + 'n') f.flush() f.close() def main(): #load all your data, then throw out all unnecessary tables/columns filename = 'skill_TEST_POOL.txt' #sets up the necessary multiprocessing tasks manager = Manager() q = manager.Queue() pool = Pool(cpu_count() + 2) watcher = pool.map_async(listener,(q,)) jobs = [] #titles_states is a dataframe of millions of job titles and states they were found in for i in titles_states.iloc: job = pool.map_async(worker, (i, q)) jobs.append(job) for job in jobs: job.get() q.put('kill') pool.close() pool.join() if __name__ == "__main__": main()
由于每个数据帧的大小都不同(总共约有100Gb),所以将所有数据都放入内存是不可能的。通过将最终的数据帧逐行写入内存,但从来不在内存中存储完整的数据帧。我们可以完成所有的计算和组合任务。这里的“标准方法”是,我们可以仅仅在“job_title_location_matcher”的末尾编写一个“write_line”方法,但这样每次只会处理一个实例。根据我们需要处理的职位/州的数量,这大概需要2天的时间。而通过multiprocessing,只需2个小时。
虽然读者可能接触不到本教程处理的任务环境,但通过multiprocessing,可以突破许多计算机硬件的限制。本例的工作环境是c3.8xl ubuntu ec2,硬件为32核60Gb内存(虽然这个内存很大,但还是无法一次性放入所有数据)。这里的关键之处是我们在60Gb的内存的机器上有效的处理了约100Gb的数据,同时速度提升了约25倍。通过multiprocessing在多核机器上自动处理大规模的进程,可以有效提高机器的利用率。也许有些读者已经知道了这个方法,但对于其他人,可以通过multiprocessing能带来非常大的收益。顺便说一句,这部分是skill assets in the job-market这篇博文的延续。

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

PHP主要是过程式编程,但也支持面向对象编程(OOP);Python支持多种范式,包括OOP、函数式和过程式编程。PHP适合web开发,Python适用于多种应用,如数据分析和机器学习。

PHP适合网页开发和快速原型开发,Python适用于数据科学和机器学习。1.PHP用于动态网页开发,语法简单,适合快速开发。2.Python语法简洁,适用于多领域,库生态系统强大。

在 Sublime Text 中运行 Python 代码,需先安装 Python 插件,再创建 .py 文件并编写代码,最后按 Ctrl B 运行代码,输出会在控制台中显示。

PHP起源于1994年,由RasmusLerdorf开发,最初用于跟踪网站访问者,逐渐演变为服务器端脚本语言,广泛应用于网页开发。Python由GuidovanRossum于1980年代末开发,1991年首次发布,强调代码可读性和简洁性,适用于科学计算、数据分析等领域。

Python更适合初学者,学习曲线平缓,语法简洁;JavaScript适合前端开发,学习曲线较陡,语法灵活。1.Python语法直观,适用于数据科学和后端开发。2.JavaScript灵活,广泛用于前端和服务器端编程。

Golang在性能和可扩展性方面优于Python。1)Golang的编译型特性和高效并发模型使其在高并发场景下表现出色。2)Python作为解释型语言,执行速度较慢,但通过工具如Cython可优化性能。

在 Visual Studio Code(VSCode)中编写代码简单易行,只需安装 VSCode、创建项目、选择语言、创建文件、编写代码、保存并运行即可。VSCode 的优点包括跨平台、免费开源、强大功能、扩展丰富,以及轻量快速。

在 Notepad 中运行 Python 代码需要安装 Python 可执行文件和 NppExec 插件。安装 Python 并为其添加 PATH 后,在 NppExec 插件中配置命令为“python”、参数为“{CURRENT_DIRECTORY}{FILE_NAME}”,即可在 Notepad 中通过快捷键“F6”运行 Python 代码。
