本程序从网上抓取免费代理IP存到文件,然后用抓取的IP抓取百度首页测试其是否可用,再用Flask框架搭建一个本地接口,可从此接口获取代理池中的代理IP,用此代理池中获取的IP抓取qq音乐热歌榜前300的歌曲信息保存到文件
从西刺代理获取代理IP
import requestsfrom requests.exceptions import RequestExceptionimport redef get_page(url,headers): """ 获取页面 """ response = requests.get(url,headers=headers) try: if response.status_code == 200: return response.text return None except RequestException: return "出错"def html_paser(html): """ 页面解析 """ #re.S匹配所有字符,包括换行 patten = re.compile(".*?((\d+).(\d+).(\d+).(\d+)).*?(\d+)",re.S) ip_info = re.findall(patten,html) return ip_infodef save(ip_info): """ 存入文件 """ with open("ip_pool","w",encoding="utf-8") as f: for i in range(len(ip_info)): #生成ip:port形式 ip = ip_info[i][0] + ":" + ip_info[i][5] f.write(ip+"\n")def main(): url = "http://www.xicidaili.com" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/66.0.3359.181 Safari/537.36"} html = get_page(url,headers) ip_info = html_paser(html) save(ip_info)if __name__ == "__main__": main()
检测IP是否可用
import requestsfrom requests.exceptions import RequestExceptionfrom multiprocessing import Pooldef check(url,ip): """ 检测代理是否可用 """ proxies = { 'http': 'http://' + ip } try: response = requests.get(url,proxies) if response.status_code == 200: #测试获取百度首页是否成功 print("%s is useful %s"%(ip.strip(),response.status_code)) else: #不成功说明此代理IP不可用 print("---------!!!----------\n") print("%s is no use"%ip.strip()) print("---------!!!----------\n") except RequestException: print("error") exit(-1)def main(): """ 主函数 """ url = "http://www.baidu.com" #开启5个进程 pool = Pool(5) with open("ip_pool","r",encoding="utf-8") as f: for line in f: #逐个检测 pool.apply_async(check,args=(url,line)) pool.close() #等待子进程完成 pool.join()if __name__ == "__main__": main()
用Flask框架搭建本地API以获取IP
from flask import *app = Flask(__name__)i = 1line_len = 0@app.route('/')def index(): """ 首页获取单条IP信息 """ global i global line_len with open("ip_pool","r",encoding="utf-8") as f: #下一次访问此页面时返回下一条IP f.seek(line_len) for line in f: line_len += len(line) i += 1 #使得到的IP不包括换行或空白字符 return line.strip()@app.route('/getall')def get_all(): """ 获取所有IP信息 """ all = [] with open("ip_pool", "r", encoding="utf-8") as f: for line in f: all.append(line.strip()) return str(all)if __name__ == "__main__": app.run()
用从本地API获取的IP抓取qq音乐热歌榜前300的歌曲信息
import requestsfrom urllib.parse import urlencodeimport jsonfrom bs4 import BeautifulSoupfrom requests.exceptions import RequestExceptiondef get_proxies(): "从代理池获取代理IP" url = "http://127.0.0.1:5000" response = requests.get(url) if response.status_code == 200: if response.text: proxy = response.text return proxy else: return get_proxies() else: print("ip pool error") exit(-1)def get_one_page(url,headers,num): """ 获取页面 """ proxy = get_proxies() #代理 proxies = { 'http': 'http://' + proxy } data = { "tpl": "3", "page":"detail", "date":"2018_27", "topid":"26", "type":"top", "song_begin":"0", "song_num":"%s"%num, "g_tk":"5381", "jsonpCallback":"MusicJsonCallbacktoplist", "loginUin":"0", "hostUin":"0", "format":"jsonp", "inCharset":"utf8", "outCharset":"utf-8", "notice":"0", "platform":"yqq", "needNewCode":"0", } url = url + urlencode(data) #使用代理 response = requests.get(url,headers=headers,proxies=proxies) try: if response.status_code == 200: return response.text else: return None except RequestException: return Nonedef html_paser(html,song_url): """ 页面解析 """ #字典格式 content = json.loads(html) for i in range(len(content["songlist"])): #用字典解析 rank = content["songlist"][i]['cur_count'] songname = content["songlist"][i]['data']["albumname"] singer = content["songlist"][i]["data"]["singer"][0]["name"] songurl = song_url+content["songlist"][i]["data"]["albummid"]+".html" #生成器 yield (rank,songname,singer,songurl)def write2file(songlist): """ 保存 """ with open("qqmusic_pop","w",encoding="utf-8") as f: for song in songlist: f.write(str(song)+"\n")def main(): #歌曲链接与获取歌曲时的url不同,与歌曲id拼接成完整的歌曲链接 song_url = "https://y.qq.com/n/yqq/album/" #ajax请求时的url,与请求头拼接成完整的请求信息 url = "https://c.y.qq.com/v8/fcg-bin/fcg_v8_toplist_cp.fcg?" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/66.0.3359.181 Safari/537.36"} #请求数为300 html = get_one_page(url,headers,300) #返回数据的前26位不是json格式,最后一位是一个括号,也不要 html = html[26:len(html)-1].replace(" ","") songlist = html_paser(html,song_url) write2file(songlist)if __name__ == "__main__": main()