首页 > 学院 > 开发设计 > 正文

Mproxy项目实录第3天

2019-11-06 06:04:18
字体:
来源:转载
供稿:网友

关于这个系列

这个项目实录系列是记录MPRoxy项目的整个开发流程。项目最终的目标是开发一套代理服务器的API。这个系列中会记录项目的需求、设计、验证、实现、升级等等,包括设计决策的依据,开发过程中的各种坑。希望和大家共同交流,一起进步。

项目的源码我会同步更新到GitHub,项目地址:https://github.com/mrbcy/Mproxy。

系列地址:

Mproxy项目实录第1天

Mproxy项目实录第2天

今日计划

到目前为止,我们已经有了一个可以爬取快代理网站中代理服务器地址的爬虫,并且这个爬虫可以将爬取过程记录到日志文件中,将爬取到的代理服务器提交到Kafka集群,同时还可以保证运行过程中不崩溃。但是还有一部分可以加强。

首先就是上一篇提到的将爬取过的代理服务器地址存入MongoDB,3天之内已经爬取过的ip就不再提交到Kafka集群了。另一个就是把类似Kafka集群的地址、MongoDB数据库地址、日志文件的名称这些内容放到配置文件中来方便今后的修改。

在完成了这些之后,今天我们还将完成验证器的开发工作。

将爬取记录提交到MongoDB

MongoDB是一个跨平台的NoSQL,基于Key-Value形式保存数据。其存储格式非常类似于Python的字典,因此用Python操作MongoDB会非常的容易。

MongoDB的安装可以参考http://blog.csdn.net/chenpy/article/details/50324989

然后进行了一下技术验证,可以参考http://blog.csdn.net/mrbcy/article/details/60141158

写了一个工具类用于更新MongoDB中的代理服务器数据和查询是否有重复代理服务器。

#-*- coding: utf-8 -*-import loggingimport datetimefrom pymongo import MongoClientclass KuaidailiProxyRecorder: def __init__(self,mongodb_host='localhost',port=27017,record_days=3): self.client = MongoClient(mongodb_host, port) self.db = self.client.mproxy self.collection = self.db.kuaidaili_proxy_records self.record_days = record_days def save_proxy(self,proxy_item): try: record = {} record['ip'] = proxy_item['ip'] record['update_time'] = datetime.datetime.now() self.collection.save(record) except Exception as e: logging.exception("An Error Happens") def find_repeat_proxy(self,ip): try: d = datetime.datetime.now() d = d - datetime.timedelta(days=self.record_days) return self.collection.find_one({'ip':ip,'update_time':{"$gt": d}}) except Exception as e: logging.exception("An Error Happens")

代码很简单,这里就不再解释了。看一下pipelines里面是怎么用的。

if self.proxy_recorder.find_repeat_proxy(item['ip']) is None: logging.debug(item['ip'] + ' is not repeat') self.proxy_recorder.save_proxy(item) self.producer.send('unchecked-servers', item.__dict__) # Makes the item could be JSON serializableelse: logging.debug(item['ip'] + ' is repeat, not submit to Kafka cluster')

如果在MongoDB中找到了重复的代理服务器,那么久不提交到Kafka集群,否则提交,并且更新代理服务器的信息到MongoDB。

读取配置文件

为了后面维护的方便,将MongoDB数据库地址、Kafka集群地址、日志文件名字这几个值保存到配置文件中。

因为之前没有使用过Python的配置文件,同样进行了技术探索,可以参考http://blog.csdn.net/mrbcy/article/details/60143067

然后写了一个配置读取工具。

#-*- coding: utf-8 -*-import ConfigParserclass ConfigLoader: def __init__(self): self.cp = ConfigParser.SafeConfigParser() self.cp.read('kuaidaili_spider.cfg') def get_mongodb_host(self): return self.cp.get('mongodb','host') def get_kafka_bootstrap_servers(self): text = self.cp.get('kafka','bootstrap_servers') return text.split(',') def get_log_file_name(self): return self.cp.get('log','log_file_name')

然后把硬编码的字符串换成了函数调用。

好了,到这里快代理的爬虫开发就基本告一段落了。接下来我们进入验证器的开发。

验证器开发

目标

验证器的作用是验证爬虫爬取到的代理服务器是否可用。由于各地网络环境不同,最好是在不同地理位置的服务器上运行,测试同样的代理服务器。只有一个代理服务器在所有的验证器上都可用时,才能认为这个代理服务器可用。

与爬虫不同,验证器只需要写一次代码,然后在不同的机器上运行即可。但是由于后续的收集器需要,必须配置一个验证器名称。这个验证器名称准备存入配置文件中。

验证代理服务器

使用下面的代码就可以验证代理服务器是否可用。

#-*- coding: utf-8 -*-import reimport requestsdef valid_proxy(): headers = { "Host": "www.baidu.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3", "Accept-Encoding": "gzip, deflate", "Cookie": "CXID=AFB58656EB6137C12D0E4FF12BC6DFFE; SUV=1484628390086037; m=FAB6EC92D3062F7D84CC06636E62F609; ABTEST=0|1486986265|v17; ad=oe45yZllll2Y$gmTlllllVAIWEtlllllJa0oJyllll9lllll9Zlll5@@@@@@@@@@; SUID=B96B30B65412940A00000000586E6482; ld=okllllllll2Y7@v2lllllVA8dw1lllllH0xrAlllll9lllllpZlll5@@@@@@@@@@; YYID=FAB6EC92D3062F7D84CC06636E62F609; SNUID=441BB6B17B7E35AEB86CFBF37CECC35E; usid=Ibgtjb1FmwpmVEd9; IPLOC=CN1101; browerV=8; osV=1", "Connection": "keep-alive", "Upgrade - Insecure - Requests": "1" } res = requests.get('http://www.sogou.com/',proxies = {'http':'120.77.156.50:80'}) regex = """050897""" pattern = re.compile(regex) if re.search(pattern=pattern,string=res.text) is not None: print "proxy is available" else: print "proxy is unavailable"if __name__ == '__main__': valid_proxy()

原理是访问sogou.com,然后在返回的页面代码中查找搜狗的备案号。

输出结果为:

proxy is available

采用多线程方式进行验证

因为我们要验证的代理服务器有很多,验证一个代理服务器的时间可能很长。因此必须使用多线程来并发的验证代理服务器。

首先需要设计一个工具类用于保存待验证的代理服务器列表。这个工具类必须保证一个线程在操作列表时不会受到其他线程的干扰。因此我们需要用到Python的锁机制。实验代码如下所示:

#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item)

test.py

#-*- coding: utf-8 -*-import threadingfrom util.proxyqueue import ProxyQueueclass T1(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T1 add proxy" self.queue.add_proxy(1) print "T1 add proxy end"class T2(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): print "T2 add proxy" self.queue.add_proxy(1) print "T2 add proxy end"if __name__ == '__main__': queue = ProxyQueue() t1 = T1(queue) t2 = T2(queue) t1.start() t2.start()

输出结果为:

T1 add proxyT1 add proxy endT2 add proxy

且程序处于等待状态。这说明我们的锁机制起到了应有的作用。完整的工具类代码如下:

#-*- coding: utf-8 -*-import threadingclass ProxyQueue: def __init__(self): self.lock = threading.Lock() self.proxy_list = [] def add_proxy(self,proxy_item): self.lock.acquire() self.proxy_list.append(proxy_item) self.lock.release() def get_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list[0] self.lock.release() return proxy_item def pop_proxy(self): proxy_item = None self.lock.acquire() proxy_count = len(self.proxy_list) if proxy_count > 0: proxy_item = self.proxy_list.pop() self.lock.release() return proxy_item def get_proxy_count(self): proxy_count = 0 self.lock.acquire() proxy_count = len(self.proxy_list) self.lock.release() return proxy_count

接下来我们写多线程的验证程序。代码如下:

#-*- coding: utf-8 -*-import timefrom util.proxyqueue import ProxyQueuefrom validator import ProxyValidatorif __name__ == '__main__': validator_num = 10 validators = [] queue = ProxyQueue() queue.add_proxy({'ip':'182.254.129.123','port':'80'}) queue.add_proxy({'ip':'101.53.101.172','port':'9999'}) queue.add_proxy({'ip':'106.46.136.204','port':'808'}) queue.add_proxy({'ip':'117.90.1.34','port':'9000'}) queue.add_proxy({'ip':'117.90.6.134','port':'9000'}) queue.add_proxy({'ip':'125.123.76.134','port':'8998'}) queue.add_proxy({'ip':'125.67.75.53','port':'9000'}) queue.add_proxy({'ip':'115.28.169.160','port':'8118'}) queue.add_proxy({'ip':'117.90.1.35','port':'9000'}) queue.add_proxy({'ip':'111.72.126.161','port':'808'}) queue.add_proxy({'ip':'121.232.148.94','port':'9000'}) queue.add_proxy({'ip':'117.90.7.106','port':'9000'}) available_proxies = ProxyQueue() for i in xrange(validator_num): validators.append(ProxyValidator(queue=queue, available_proxies=available_proxies)) validators[i].start() while True: is_finish = True for i in xrange(validator_num): if validators[i].is_finish == False: is_finish = False break if queue.get_proxy_count() == 0 and is_finish == True: break for i in xrange(validator_num): if validators[i].is_finish == True and queue.get_proxy_count() > 0: validators[i] = ProxyValidator(queue=queue,available_proxies = available_proxies) validators[i].start() print "分配一个新的验证器开始工作" print "当前任务列表长度:" + str(queue.get_proxy_count()) time.sleep(1) print "代理服务器验证完毕,可用代理服务器数量:" + str(available_proxies.get_proxy_count())

引入Kafka

#-*- coding: utf-8 -*-import ctypesimport inspectimport jsonimport threadingfrom kafka import KafkaConsumerdef _async_raise(tid, exctype): '''Raises an exception in the threads with id tid''' if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError("PyThreadState_SetAsyncExc failed")class KafkaProxyListener(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def _get_my_tid(self): """determines this (self's) thread id CAREFUL : this function is executed in the context of the caller thread, to get the identity of the thread represented by this instance. """ if not self.isAlive(): raise threading.ThreadError("the thread is not active") # do we have it cached? if hasattr(self, "_thread_id"): return self._thread_id # no, look for it in the _active dict for tid, tobj in threading._active.items(): if tobj is self: self._thread_id = tid return tid # TODO: in python 2.6, there's a simpler way to do : self.ident raise AssertionError("could not determine the thread's id") def raiseExc(self, exctype): """Raises the given exception type in the context of this thread. If the thread is busy in a system call (time.sleep(), socket.accept(), ...), the exception is simply ignored. If you are sure that your exception should terminate the thread, one way to ensure that it works is: t = ThreadWithExc( ... ) ... t.raiseExc( SomeException ) while t.isAlive(): time.sleep( 0.1 ) t.raiseExc( SomeException ) If the exception is to be caught by the thread, you need a way to check that your thread has caught it. CAREFUL : this function is executed in the context of the caller thread, to raise an excpetion in the context of the thread represented by this instance. """ _async_raise(self._get_my_tid(), exctype) def run(self): consumer = KafkaConsumer('unchecked-servers', group_id='test-grou1p', bootstrap_servers=['amaster:9092','anode1:9092','anode2:9092'], auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8'))) for message in consumer: v = message.value['_values'] self.queue.add_proxy(v)

即监听Kafka的unchecked-servers,把spider提交上来的代理服务器加入到待验证列表中,然后调度器就会逐个的进行验证。

然后加入配置文件以及验证器名称。这部分用到的技术在昨天的博客中已经介绍过了,在此不再赘述。详细的情况可以参看GitHub上的源码。


上一篇:指针的使用

下一篇:细菌增殖

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表