1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| import time import typing as t import asyncio import aiohttp
from common import CacheProperty
class ManManBuyApi(object): """ 慢慢买平台API """ MAX_COROUTINE_NUM = 10 BASE_URL = 'http://api.manmanbuy.com/searchapi_utf8.aspx?' SHOPPING_BASE_URL = 'http://sapi.manmanbuy.com/SiteList.aspx?' DETAIL_BASE_URL = 'http://sapi.manmanbuy.com/Search_GetPrice.aspx?' AppKey = '' ATTR_MAP = { 'app_key': 'AppKey', 'key': 'Key', 'category': 'Class', 'brand': 'Brand', 'site': 'Site', 'price_min': 'PriceMin', 'price_max': 'PriceMax', 'page_num': 'PageNum', 'page_size': 'PageSize', 'order_by': 'OrderBy', 'zy': 'ZiYing', 'zy_status': 'zyStatus', 'is_need_optimize': 'isNeedOptimize' }
def __init__( self, key: str, category: str = "0", brand: str = "0", site: str = "0", price_min: int = 0, price_max: int = 0, page_num: int = 1, page_size: int = 80, order_by: str = 'score', zy: bool = False, app_key: t.Optional[str] = None, ): self.key = key self.category = category self.brand = brand self.site = site self.price_min = price_min self.price_max = price_max self.page_num = page_num self.page_size = page_size self.order_by = order_by self.zy = zy self.app_key = app_key or self.AppKey
@CacheProperty def sku_url(self) -> str: """ 构建请求url, 并缓存 """ url_params = "" params = self.__dict__.copy() for key, value in params.items(): url_params += f'{self.ATTR_MAP.get(key)}={value}&' return self.BASE_URL + url_params + '&ExtraParameter=0'
@CacheProperty def shopping_mall_url(self): """ 辅助API商城名称的URL """ return f'{self.SHOPPING_BASE_URL}AppKey={self.AppKey}&SiteId=0'
@CacheProperty def sku_detail_url(self): """ 辅助API(商品详情)的URL """ return f'{self.DETAIL_BASE_URL}AppKey={self.AppKey}'
async def get_goods(self) -> str: """ 异步请求 """ async with aiohttp.request('GET', self.sku_url) as response: return await response.json(content_type='text/html', encoding='utf-8')
async def get_shopping(self) -> str: """ 辅助API, 获取商城ID和商城名称对应API """ print(self.shopping_mall_url) async with aiohttp.request('GET', self.shopping_mall_url) as response: return await response.text()
async def get_detail(self) -> str: """ 辅助API, 根据商品ID获取商品信息 """ async with aiohttp.request('GET', self.sku_detail_url) as response: return await response.text()
def callback(self, result): """ 将数据添加到数据库 """ print('回调函数')
def main(self): """ 协程执行入口 """ start_time = time.time() tasks = [] loop = asyncio.get_event_loop() for _ in range(self.MAX_COROUTINE_NUM): task = asyncio.ensure_future(man_man_buy.get_shopping()) task.add_done_callback(self.callback) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) end_time = time.time() print(f'开{self.MAX_COROUTINE_NUM}个任务异步处理,总耗时:{end_time - start_time}')
man_man_buy = ManManBuyApi("phone") man_man_buy.main()
|