业务中可能需要对很大量级(比如 100 亿)的数据(几十 G)进行排序或是去重等操作,在不使用 Hadoop 等工具的情况下如何本地处理呢?
首先可以确定的思路是,将这几十 G 的文件分成多个小文件,然后分别排序或去重,最后合并。
Hash 分割
在分割大文件之前有一个问题,如果数据是包括字母的字符串怎么办?这时我们需要字符串 Hash 函数,byvoid 对一些常用函数做过总结。这里我们使用一种采用 md5 作为转码规则的函数,几乎不会产生碰撞。
import md5
import sys
class Encoder(object):
def encode_64(self, input_str):
md5_obj = md5.new()
md5_obj.update(input_str)
md5hex = md5_obj.digest()
md5sum0 = (ord(md5hex[3]) << 24) | (ord(md5hex[2]) << 16) \
| (ord(md5hex[1]) << 8) | ord(md5hex[0])
md5sum1 = (ord(md5hex[7]) << 24) | (ord(md5hex[6]) << 16) \
| (ord(md5hex[5]) << 8) | ord(md5hex[4])
md5sum2 = (ord(md5hex[11]) << 24) | (ord(md5hex[10]) << 16) \
| (ord(md5hex[9]) << 8) | ord(md5hex[8])
md5sum3 = (ord(md5hex[15]) << 24) | (ord(md5hex[14]) << 16) \
| (ord(md5hex[13]) << 8) | ord(md5hex[12])
sign1 = (md5sum0 + md5sum2) & 0xffffffff
sign2 = (md5sum1 + md5sum3) & 0x7fffffff
result = (sign1 | (sign2 << 32))
return str(result)
if __name__ == '__main__':
#a = '0403FCA3FB4EDF022F179D9B48B79674'
#result = encoder.encode_64(a)
encoder = Encoder()
for line in sys.stdin:
linelist = line.strip().split('\t')
cuid = linelist[0]
result = encoder.encode_64(cuid)
print '%s\t%s' % (result, cuid)
这样我们得到的都是数字,假如想把文件分成 1000 份,那么需要对每一个数字对 1000 取模,保证相同的数字在同一个文件内。对质数取模一般能更均匀地映射。
合并
拆分之后就是一堆几十 MB 的小文件了,对于每个文件都可能放进内存中排序。
之后我们维护一个堆,遍历所有文件取第一个元素,并记录文件号。每拿出一个元素就去那个文件里再取一个(直到文件被读完),调整堆结构,堆中拿出的元素就是最终的结果。