From 3931f5a89fc6aa42edea1cae0ff28f849cc869fc Mon Sep 17 00:00:00 2001 From: hujunxianligong Date: Mon, 19 Aug 2019 13:54:35 +0800 Subject: [PATCH 1/5] use requests instead of aiohttp --- examples/demo_server.py | 12 +++++ examples/demo_speed.py | 23 +++++++++ test.py | 95 ++++++++++++++++++++++++++++++++++++++ webcollector/fetch.py | 23 ++++++--- webcollector/plugin/net.py | 30 +++++++++++- 5 files changed, 175 insertions(+), 8 deletions(-) create mode 100644 examples/demo_server.py create mode 100644 examples/demo_speed.py create mode 100644 test.py diff --git a/examples/demo_server.py b/examples/demo_server.py new file mode 100644 index 0000000..dcde4c9 --- /dev/null +++ b/examples/demo_server.py @@ -0,0 +1,12 @@ +# coding=utf-8 +from flask import Flask +import time +import random +app = Flask(__name__) + +@app.route("/") +def index(): + time.sleep(2) + return "ok" + +app.run() diff --git a/examples/demo_speed.py b/examples/demo_speed.py new file mode 100644 index 0000000..15a27f5 --- /dev/null +++ b/examples/demo_speed.py @@ -0,0 +1,23 @@ +# coding=utf-8 +import webcollector as wc +import time + + +class RubyChinaCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=False) + self.num_threads = 10 + self.add_seeds(["https://ruby-china.org/topics?page={}".format(i) for i in range(1, 40)]) + + def visit(self, page, detected): + print("start_visit", page.url) + time.sleep(4) + print("end_visit", page.url) + + +crawler = RubyChinaCrawler() +start = time.time() +crawler.start(10) +print(time.time() - start) + + diff --git a/test.py b/test.py new file mode 100644 index 0000000..7f22256 --- /dev/null +++ b/test.py @@ -0,0 +1,95 @@ +# coding=utf-8 + +import asyncio +import requests +import random +import threading + + + +import asyncio +import requests +from concurrent.futures import ThreadPoolExecutor +import time + +url = "http://127.0.0.1:5000" + +loop = asyncio.get_event_loop() + + +async def cor(): + print("c-start") + await asyncio.sleep(4) + print("c-end") + + +async def main(): + tasks = [loop.create_task(cor()) for _ in range(10)] + print("finish tasks======") + for i, task in enumerate(tasks): + print("start", i) + time.sleep(5) + await task + print("end", i) + +loop.run_until_complete(main()) + +adfads + +pool = ThreadPoolExecutor(20) + + + +def request(i): + print("start", i) + time.sleep(5) + text = requests.get(url).text + print("content:", i, text) + print(threading.get_ident()) + return text + + +# f0 = loop.run_in_executor(pool, request) +# f1 = loop.run_in_executor(pool, request) +# futures = [loop.run_in_executor(pool, request, i) for i in range(20)] +# futures = [loop.run_in_executor(None, requests.get, "http://127.0.0.1:5000") for _ in range(10)] +# print("======") + + +async def cor(i): + for j in range(20): + future = loop.run_in_executor(pool, request, "{}_{}".format(i, j)) + await future + print("cor", i) + print("end-cor", i) + +loop.run_until_complete(asyncio.gather(*[cor(i) for i in range(10)])) + + +# async def main(): +# # for future in futures: +# for i in range(10): +# # future = loop.run_in_executor(pool, request) +# await futures[i] +# print("end", i) +# +# loop.run_until_complete(main()) + +# async def test(): +# print("start") +# # await asyncio.sleep(2) +# request_future = loop.run_in_executor(pool, request) +# result = await request_future +# print("end") +# +# +# +# +# async def main(): +# tasks = [loop.create_task(test()) for _ in range(10)] +# for i in range(10): +# await tasks[i] +# print("task end", i) +# +# +# loop.run_until_complete(main()) \ No newline at end of file diff --git a/webcollector/fetch.py b/webcollector/fetch.py index 40a3ade..0b7fe6b 100644 --- a/webcollector/fetch.py +++ b/webcollector/fetch.py @@ -28,6 +28,9 @@ def __init__(self, self.execute_func = execute_func self.num_threads = num_threads + self.loop = None + + async def async_start(self): self.fetch_queue = queue.Queue() self.feed_stopped = False @@ -35,14 +38,19 @@ async def async_start(self): self.db_manager.init_fetch_and_detect() self.generator = self.db_manager.create_generator() self.generator.generator_filter = self.generator_filter - async with self.requester.create_async_context_manager(): - coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] - await asyncio.gather(*coroutines) + + # async with self.requester.create_async_context_manager(): + # coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] + # await asyncio.gather(*coroutines) + + coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] + await asyncio.gather(*coroutines) + self.db_manager.close() def start(self): - loop = asyncio.get_event_loop() - loop.run_until_complete(self.async_start()) + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.async_start()) return self.generator.num_generated def feed(self): @@ -63,7 +71,10 @@ async def fetch_coroutine(self, execute_func): else: crawl_datum = self.fetch_queue.get(block=False) try: - page = await self.requester.get_response(crawl_datum) + # loop = asyncio.get_event_loop() + request_future = self.loop.run_in_executor(None, self.requester.get_response, crawl_datum) + page = await request_future + # page = await self.requester.get_response(crawl_datum) detected = CrawlDatums() execute_func(page, detected) diff --git a/webcollector/plugin/net.py b/webcollector/plugin/net.py index 3a81aaa..991d87c 100644 --- a/webcollector/plugin/net.py +++ b/webcollector/plugin/net.py @@ -3,9 +3,9 @@ from webcollector.model import Page from webcollector.net import Requester import aiohttp +import requests - -class HttpRequester(Requester): +class AioHttpRequester(Requester): def __init__(self): self.session = None @@ -32,5 +32,31 @@ async def get_response(self, crawl_datum): return page +class HttpRequester(Requester): + + def __init__(self): + self.session = None + + + # def create_async_context_manager(self): + # self.session = aiohttp.ClientSession() + # return self.session + + # def request(self, crawl_datum): + # headers = {"User-Agent": DEFAULT_USER_AGENT} + # return requests.get(crawl_datum.url, headers=headers) + + def get_response(self, crawl_datum): + headers = {"User-Agent": DEFAULT_USER_AGENT} + response = requests.get(crawl_datum.url, headers=headers) + + code = response.status_code + content = response.content + encoding = response.encoding + content_type = response.headers["Content-Type"] + crawl_datum.code = code + page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) + + return page From fbcb316cbdb11dc208f947b866d99c0cb390f0bc Mon Sep 17 00:00:00 2001 From: hujunxianligong Date: Mon, 19 Aug 2019 21:52:12 +0800 Subject: [PATCH 2/5] 0.5 --- README.md | 64 +++++++++++++++++++++++++ examples/demo_custom_http_request.py | 56 ++++++++++++++++++++++ examples/demo_speed.py | 4 +- setup.py | 5 +- webcollector/fetch.py | 7 ++- webcollector/net.py | 16 +++++-- webcollector/plugin/net.py | 70 ++++++++++++---------------- 7 files changed, 170 insertions(+), 52 deletions(-) create mode 100644 examples/demo_custom_http_request.py diff --git a/README.md b/README.md index 84ca1cf..f6ee201 100644 --- a/README.md +++ b/README.md @@ -169,4 +169,68 @@ class NewsCrawler(wc.RedisCrawler): crawler = NewsCrawler() crawler.start(10) +``` + +### Custom Http Request with Requests + +[demo_custom_http_request.py](examples/demo_custom_http_request.py): + + +``` +# coding=utf-8 + +import webcollector as wc +from webcollector.model import Page +from webcollector.plugin.net import HttpRequester + +import requests + + +class MyRequester(HttpRequester): + def get_response(self, crawl_datum): + # custom http request + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" + } + + print("sending request with MyRequester") + + # send request and get response + response = requests.get(crawl_datum.url, headers=headers) + + # update code + crawl_datum.code = response.status_code + + # wrap http response as a Page object + page = Page(crawl_datum, + response.content, + content_type=response.headers["Content-Type"], + http_charset=response.encoding) + + return page + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True) + self.num_threads = 10 + + # set requester to enable MyRequester + self.requester = MyRequester() + + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) ``` \ No newline at end of file diff --git a/examples/demo_custom_http_request.py b/examples/demo_custom_http_request.py new file mode 100644 index 0000000..d9068e4 --- /dev/null +++ b/examples/demo_custom_http_request.py @@ -0,0 +1,56 @@ +# coding=utf-8 + +import webcollector as wc +from webcollector.model import Page +from webcollector.plugin.net import HttpRequester + +import requests + + +class MyRequester(HttpRequester): + def get_response(self, crawl_datum): + # custom http request + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" + } + + print("sending request with MyRequester") + + # send request and get response + response = requests.get(crawl_datum.url, headers=headers) + + # update code + crawl_datum.code = response.status_code + + # wrap http response as a Page object + page = Page(crawl_datum, + response.content, + content_type=response.headers["Content-Type"], + http_charset=response.encoding) + + return page + + +class NewsCrawler(wc.RamCrawler): + def __init__(self): + super().__init__(auto_detect=True) + self.num_threads = 10 + + # set requester to enable MyRequester + self.requester = MyRequester() + + self.add_seed("https://github.blog/") + self.add_regex("+https://github.blog/[0-9]+.*") + self.add_regex("-.*#.*") # do not detect urls that contain "#" + + def visit(self, page, detected): + if page.match_url("https://github.blog/[0-9]+.*"): + title = page.select("h1.lh-condensed")[0].text.strip() + content = page.select("div.markdown-body")[0].text.replace("\n", " ").strip() + print("\nURL: ", page.url) + print("TITLE: ", title) + print("CONTENT: ", content[:50], "...") + + +crawler = NewsCrawler() +crawler.start(10) \ No newline at end of file diff --git a/examples/demo_speed.py b/examples/demo_speed.py index 15a27f5..9e894e5 100644 --- a/examples/demo_speed.py +++ b/examples/demo_speed.py @@ -11,7 +11,7 @@ def __init__(self): def visit(self, page, detected): print("start_visit", page.url) - time.sleep(4) + # time.sleep(4) print("end_visit", page.url) @@ -19,5 +19,3 @@ def visit(self, page, detected): start = time.time() crawler.start(10) print(time.time() - start) - - diff --git a/setup.py b/setup.py index 5db3757..ecf1e73 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="webcollector", - version="0.0.4-alpha", + version="0.0.5-alpha", author="Jun Hu", packages=find_packages( exclude=[ @@ -13,7 +13,8 @@ "html5lib", "aiohttp", "BeautifulSoup4", - "redis" + "redis", + "requests" ], description="WebCollector-Python is an open source web crawler framework based on Python.It provides some simple interfaces for crawling the Web,you can setup a multi-threaded web crawler in less than 5 minutes.", license="GNU General Public License v3.0 (See LICENSE)", diff --git a/webcollector/fetch.py b/webcollector/fetch.py index 0b7fe6b..aa88016 100644 --- a/webcollector/fetch.py +++ b/webcollector/fetch.py @@ -42,10 +42,9 @@ async def async_start(self): # async with self.requester.create_async_context_manager(): # coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] # await asyncio.gather(*coroutines) - - coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] - await asyncio.gather(*coroutines) - + with self.requester: + coroutines = [self.fetch_coroutine(self.execute_func) for _ in range(self.num_threads)] + await asyncio.gather(*coroutines) self.db_manager.close() def start(self): diff --git a/webcollector/net.py b/webcollector/net.py index 36c6378..439446b 100644 --- a/webcollector/net.py +++ b/webcollector/net.py @@ -3,8 +3,18 @@ class Requester(object): - async def get_response(self, crawl_datum): - return None + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def get_response(self, crawl_datum): + raise NotImplementedError() def create_async_context_manager(self): - return None \ No newline at end of file + return None + + +with Requester() as r: + print(r) \ No newline at end of file diff --git a/webcollector/plugin/net.py b/webcollector/plugin/net.py index 991d87c..eb9c3a0 100644 --- a/webcollector/plugin/net.py +++ b/webcollector/plugin/net.py @@ -2,50 +2,11 @@ from webcollector.config import DEFAULT_USER_AGENT from webcollector.model import Page from webcollector.net import Requester -import aiohttp import requests -class AioHttpRequester(Requester): - - def __init__(self): - self.session = None - - def create_async_context_manager(self): - self.session = aiohttp.ClientSession() - return self.session - - def request(self, crawl_datum): - return self.session.get( - crawl_datum.url, - headers={"User-Agent": DEFAULT_USER_AGENT} - ) - - async def get_response(self, crawl_datum): - # async with self.session.get(crawl_datum.url) as response: - async with self.request(crawl_datum) as response: - code = response.status - content = await response.content.read() - encoding = response.get_encoding() - content_type = response.content_type - crawl_datum.code = code - page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) - return page - class HttpRequester(Requester): - def __init__(self): - self.session = None - - - # def create_async_context_manager(self): - # self.session = aiohttp.ClientSession() - # return self.session - - # def request(self, crawl_datum): - # headers = {"User-Agent": DEFAULT_USER_AGENT} - # return requests.get(crawl_datum.url, headers=headers) - def get_response(self, crawl_datum): headers = {"User-Agent": DEFAULT_USER_AGENT} response = requests.get(crawl_datum.url, headers=headers) @@ -57,6 +18,35 @@ def get_response(self, crawl_datum): crawl_datum.code = code page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) - + return page + +# class AioHttpRequester(Requester): +# +# def __init__(self): +# self.session = None +# +# def create_async_context_manager(self): +# self.session = aiohttp.ClientSession() +# return self.session +# +# def request(self, crawl_datum): +# return self.session.get( +# crawl_datum.url, +# headers={"User-Agent": DEFAULT_USER_AGENT} +# ) +# +# async def get_response(self, crawl_datum): +# # async with self.session.get(crawl_datum.url) as response: +# async with self.request(crawl_datum) as response: +# code = response.status +# content = await response.content.read() +# encoding = response.get_encoding() +# content_type = response.content_type +# crawl_datum.code = code +# page = Page(crawl_datum, content, content_type=content_type, http_charset=encoding) +# return page + + + From fc7a2a0dab70d8f967f59dd9e6b162806d0c9f5c Mon Sep 17 00:00:00 2001 From: hujunxianligong Date: Mon, 19 Aug 2019 21:53:58 +0800 Subject: [PATCH 3/5] readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f6ee201..69c0707 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ crawler.start(10) [demo_custom_http_request.py](examples/demo_custom_http_request.py): -``` +```python # coding=utf-8 import webcollector as wc From d906be09cf93db20feed11163eedd9793217847d Mon Sep 17 00:00:00 2001 From: hujunxianligong Date: Mon, 19 Aug 2019 21:58:20 +0800 Subject: [PATCH 4/5] bug --- webcollector/net.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/webcollector/net.py b/webcollector/net.py index 439446b..39c9fb5 100644 --- a/webcollector/net.py +++ b/webcollector/net.py @@ -14,7 +14,3 @@ def get_response(self, crawl_datum): def create_async_context_manager(self): return None - - -with Requester() as r: - print(r) \ No newline at end of file From c7388a5e4dd668dc7224d88665ecfa99e5f8d10e Mon Sep 17 00:00:00 2001 From: hujunxianligong Date: Mon, 19 Aug 2019 22:02:57 +0800 Subject: [PATCH 5/5] bug --- webcollector/net.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/webcollector/net.py b/webcollector/net.py index 39c9fb5..684aa7b 100644 --- a/webcollector/net.py +++ b/webcollector/net.py @@ -10,7 +10,4 @@ def __exit__(self, exc_type, exc_val, exc_tb): pass def get_response(self, crawl_datum): - raise NotImplementedError() - - def create_async_context_manager(self): - return None + raise NotImplementedError() \ No newline at end of file