首页 > 编程 > Python > 正文

Python实现大文件排序的方法

2020-01-04 18:05:50
字体:
来源:转载
供稿:网友

这篇文章主要介绍了Python大文件排序的方法,涉及Python针对文件、缓存及日期等操作的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下

本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:

 

 
  1. import gzip 
  2. import os 
  3. from multiprocessing import Process, Queue, Pipe, current_process, freeze_support 
  4. from datetime import datetime 
  5. def sort_worker(input,output): 
  6. while True: 
  7. lines = input.get().splitlines() 
  8. element_set = {} 
  9. for line in lines: 
  10. if line.strip() == 'STOP'
  11. return 
  12. try
  13. element = line.split(' ')[0] 
  14. if not element_set.get(element): element_set[element] = '' 
  15. except: 
  16. pass 
  17. sorted_element = sorted(element_set) 
  18. #print sorted_element 
  19. output.put('/n'.join(sorted_element)) 
  20. def write_worker(input, pre): 
  21. os.system('mkdir %s'%pre) 
  22. i = 0 
  23. while True: 
  24. content = input.get() 
  25. if content.strip() == 'STOP'
  26. return 
  27. write_sorted_bulk(content, '%s/%s'%(pre, i)) 
  28. i += 1 
  29. def write_sorted_bulk(content, filename): 
  30. f = file(filename, 'w'
  31. f.write(content) 
  32. f.close() 
  33. def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4): 
  34. t = datetime.now() 
  35. pre, ext = os.path.splitext(filename) 
  36. if ext == '.gz'
  37. file_file = gzip.open(filename, 'rb'
  38. else
  39. file_file = open(filename) 
  40. bulk_queue = Queue(10) 
  41. sorted_queue = Queue(10) 
  42. NUM_SORT = num_sort 
  43. sort_worker_pool = [] 
  44. for i in range(NUM_SORT): 
  45. sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) ) 
  46. sort_worker_pool[i].start() 
  47. NUM_WRITE = 1 
  48. write_worker_pool = [] 
  49. for i in range(NUM_WRITE): 
  50. write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) ) 
  51. write_worker_pool[i].start() 
  52. buf = file_file.read(buf_size) 
  53. sorted_count = 0 
  54. while len(buf): 
  55. end_line = buf.rfind('/n'
  56. #print buf[:end_line+1] 
  57. bulk_queue.put(buf[:end_line+1]) 
  58. sorted_count += 1 
  59. if end_line != -1: 
  60. buf = buf[end_line+1:] + file_file.read(buf_size) 
  61. else
  62. buf = file_file.read(buf_size) 
  63. for i in range(NUM_SORT): 
  64. bulk_queue.put('STOP'
  65. for i in range(NUM_SORT): 
  66. sort_worker_pool[i].join() 
  67.  
  68. for i in range(NUM_WRITE): 
  69. sorted_queue.put('STOP'
  70. for i in range(NUM_WRITE): 
  71. write_worker_pool[i].join() 
  72. print 'elasped ', datetime.now() - t 
  73. return sorted_count 
  74. from heapq import heappush, heappop 
  75. from datetime import datetime 
  76. from multiprocessing import Process, Queue, Pipe, current_process, freeze_support 
  77. import os 
  78. class file_heap: 
  79. def __init__(self, dir, idx = 0, count = 1): 
  80. files = os.listdir(dir) 
  81. self.heap = [] 
  82. self.files = {} 
  83. self.bulks = {} 
  84. self.pre_element = None 
  85. for i in range(len(files)): 
  86. file = files[i] 
  87. if hash(file) % count != idx: continue 
  88. input = open(os.path.join(dir, file)) 
  89. self.files[i] = input 
  90. self.bulks[i] = '' 
  91. heappush(self.heap, (self.get_next_element_buffered(i), i)) 
  92. def get_next_element_buffered(self, i): 
  93. if len(self.bulks[i]) < 256: 
  94. if self.files[i] is not None: 
  95. buf = self.files[i].read(65536) 
  96. if buf: 
  97. self.bulks[i] += buf 
  98. else
  99. self.files[i].close() 
  100. self.files[i] = None 
  101. end_line = self.bulks[i].find('/n'
  102. if end_line == -1: 
  103. end_line = len(self.bulks[i]) 
  104. element = self.bulks[i][:end_line] 
  105. self.bulks[i] = self.bulks[i][end_line+1:] 
  106. return element 
  107. def poppush_uniq(self): 
  108. while True: 
  109. element = self.poppush() 
  110. if element is None: 
  111. return None 
  112. if element != self.pre_element: 
  113. self.pre_element = element 
  114. return element 
  115. def poppush(self): 
  116. try
  117. element, index = heappop(self.heap) 
  118. except IndexError: 
  119. return None 
  120. new_element = self.get_next_element_buffered(index) 
  121. if new_element: 
  122. heappush(self.heap, (new_element, index)) 
  123. return element 
  124. def heappoppush(dir, queue, idx = 0, count = 1): 
  125. heap = file_heap(dir, idx, count) 
  126. while True: 
  127. d = heap.poppush_uniq() 
  128. queue.put(d) 
  129. if d is None: return 
  130. def heappoppush2(dir, queue, count = 1): 
  131. heap = [] 
  132. procs = [] 
  133. queues = [] 
  134. pre_element = None 
  135. for i in range(count): 
  136. q = Queue(1024) 
  137. q_buf = queue_buffer(q) 
  138. queues.append(q_buf) 
  139. p = Process(target=heappoppush, args=(dir, q_buf, i, count)) 
  140. procs.append(p) 
  141. p.start() 
  142. queues = tuple(queues) 
  143. for i in range(count): 
  144. heappush(heap, (queues[i].get(), i)) 
  145. while True: 
  146. try
  147. d, i= heappop(heap) 
  148. except IndexError: 
  149. queue.put(None) 
  150. for p in procs: 
  151. p.join() 
  152. return 
  153. else
  154. if d is not None: 
  155. heappush(heap,(queues[i].get(), i)) 
  156. if d != pre_element: 
  157. pre_element = d 
  158. queue.put(d) 
  159. def merge_file(dir): 
  160. heap = file_heap( dir ) 
  161. os.system('rm -f '+dir+'.merge'
  162. fmerge = open(dir+'.merge''a'
  163. element = heap.poppush_uniq() 
  164. fmerge.write(element+'/n'
  165. while element is not None: 
  166. element = heap.poppush_uniq() 
  167. fmerge.write(element+'/n'
  168. class queue_buffer: 
  169. def __init__(self, queue): 
  170. self.q = queue 
  171. self.rbuf = [] 
  172. self.wbuf = [] 
  173. def get(self): 
  174. if len(self.rbuf) == 0: 
  175. self.rbuf = self.q.get() 
  176. r = self.rbuf[0] 
  177. del self.rbuf[0] 
  178. return r 
  179. def put(self, d): 
  180. self.wbuf.append(d) 
  181. if d is None or len(self.wbuf) > 1024: 
  182. self.q.put(self.wbuf) 
  183. self.wbuf = [] 
  184. def diff_file(file_old, file_new, file_diff, buf = 268435456): 
  185. print 'buffer size', buf 
  186. from file_split import split_sort_file 
  187. os.system('rm -rf '+ os.path.splitext(file_old)[0] ) 
  188. os.system('rm -rf '+ os.path.splitext(file_new)[0] ) 
  189. t = datetime.now() 
  190. split_sort_file(file_old,5,buf) 
  191. split_sort_file(file_new,5,buf) 
  192. print 'split elasped ', datetime.now() - t 
  193. os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0]) 
  194. os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0]) 
  195. os.system('rm -f '+file_diff) 
  196. t = datetime.now() 
  197. zdiff = open(file_diff, 'a'
  198. old_q = Queue(1024) 
  199. new_q = Queue(1024) 
  200. old_queue = queue_buffer(old_q) 
  201. new_queue = queue_buffer(new_q) 
  202. h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3)) 
  203. h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3)) 
  204. h1.start(), h2.start() 
  205. old = old_queue.get() 
  206. new = new_queue.get() 
  207. old_count, new_count = 0, 0 
  208. while old is not None or new is not None: 
  209. if old > new or old is None: 
  210. zdiff.write('< '+new+'/n'
  211. new = new_queue.get() 
  212. new_count +=1 
  213. elif old < new or new is None: 
  214. zdiff.write('> '+old+'/n'
  215. old = old_queue.get() 
  216. old_count +=1 
  217. else
  218. old = old_queue.get() 
  219. new = new_queue.get() 
  220. print 'new_count:', new_count 
  221. print 'old_count:', old_count 
  222. print 'diff elasped ', datetime.now() - t 
  223. h1.join(), h2.join() 

希望本文所述对大家的Python程序设计有所帮助。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表