python抖音批量下载作者所有视频源码,纯官方api,无第三方接口,直接放源码文件已重构,旧版本主代码依旧可用,新版本代码,改为多文件了,可以直接压缩包下载。
直接运行,然后输入分享复制的链接就好
2022.5.20日更新
1. 增加重试,如果下载失败,会重试3次,每次会随机一个新的user-agent,如果还是失败会保存失败的链接到作者目录的"下载失败视频.txt"
2. 评论区指出的作者名称有非法字符文件夹建立失败问题,增加了正则过滤,非法字符串替换为_
3. 下载模式添加了关键字匹配,匹配视频的简介文字
4. 添加下载进度条和视频序号
2022.5.21日更新
1. 添加下载成功的视频不会二次下载,原理是下载完成后会在作者同级文件夹写入下载成功的视频唯一id,下载前检查是否有此id,有的话就跳过。
2. 添加图集下载,会自己判断作品是否为图集,如果是的话,会在作者同级目录下建立一个新的文件夹,以序号存储图片
3. 修复了当文件名不合法时,存储为0byte文件的问题
2022.5.22日更新
1. 增加批量下载模式,请在代码或者单文件程序同级目录下放置一个 ”作者主页链接.txt“ 的文件,格式为每行一个作者主页链接,批量模式是检测到该文件有内容时自动触发,如果想再次使用单作者批量下载请清空文件内容或者删除文件即可
2022.5.23日更新
1. 根据 话痨司机啊 的建议,修改存储已下载视频id的方式改为数据库存储,文件为同级目录下的 "douyin.db" 。如果不想重复下载视频,请不要删除这个文件,或者你想下载之前下载成功的视频,请删除这个文件
2. 解决了部分作者无法下载的问题,原因为小概率会出现视频不存在,丢失视频vid,然后直接报错keyerror
3. 解决了当部分视频简介出现"\n"换行符时导致文件无法创建的问题
2022.5.25日更新
1. 感谢 话痨司机啊 的推荐,使用 gooey 库重构为图形界面,由于界面的原因,代码分为了三个文件,帖子的代码依旧可用,新版本代码改为压缩包分享,下面可下载
2. 修复了部分作者拉取报错的问题,控制台版本也修复了这个问题,可以直接复制使用
2022.5.26日更新
1. 再再再次更新,修复了一个老bug了,这个bug前前后后修了三四次了,这次应该是彻底修复了。就是大家下载的时候需要一个sec_uid参数,这个参数的来源是短链接跳转后的长链接里面的一个参数,前前后后我用了正则,数组,都有bug导致提取不到该参数,今天我才发现有个自带的urllib.parse可以直接把path转为字典,这样就不会解析错误了,我是傻逼....
import linecache import os import re from faker import Faker from concurrent.futures import ThreadPoolExecutor, as_completed from requests.adapters import HTTPAdapter from tqdm import tqdm import requests import sqlite3 import urllib.parse def creat_table(table_name): conn = sqlite3.connect('douyin.db') c = conn.cursor() c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name} (ID INTEGER PRIMARY KEY AUTOINCREMENT,VID TEXT NOT NULL);''') conn.commit() conn.close() def insert_data(table_name, vid): conn = sqlite3.connect('douyin.db') c = conn.cursor() cursor = c.execute(f"SELECT vid from t_{table_name}") already_have = False for row in cursor: if vid in row: already_have = True if already_have is False: c.execute(f"INSERT INTO t_{table_name} (ID,VID) VALUES (null,{vid})") conn.commit() conn.close() def selet_data(table_name): conn = sqlite3.connect('douyin.db') c = conn.cursor() cursor = c.execute(f"SELECT VID from t_{table_name}") vid_list = [out_exp[0] for out_exp in cursor] return vid_list class Douyin: def __init__(self, url): self.share_url = url self.headers = { 'User-Agent': "Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420+ (KHTML, like Gecko) Version/3.0 Mobile/1C28 Safari/419.3" } self.sec_uid = None self.uid = None self.nick_name = None def get_user_info(self): resp = requests.get(self.share_url, headers=self.headers) self.sec_uid = 'sec_uid=' + urllib.parse.parse_qs(urllib.parse.urlparse(resp.url).query)['sec_uid'][0] user_info = f'https://www.iesdouyin.com/web/api/v2/user/info/?{self.sec_uid}' resp = requests.get(user_info, headers=self.headers) user_data = { 'signature': resp.json()['user_info']['signature'], 'nickname': resp.json()['user_info']['nickname'], 'aweme_count': resp.json()['user_info']['aweme_count'], 'following_count': resp.json()['user_info']['following_count'], 'total_favorited': resp.json()['user_info']['total_favorited'], 'avatar': resp.json()['user_info']['avatar_larger']['url_list'][0], } self.uid = resp.json()['user_info']['uid'] self.nick_name = re.sub(r'[<|>\/:"*?]', '_', resp.json()['user_info']['nickname']) creat_table(self.uid) return user_data def get_all_video(self): max_cursor = 0 video_has_more = True all_video_list = [] if self.sec_uid is None: self.get_user_info() while video_has_more is True: json_url = f'https://www.iesdouyin.com/web/api/v2/aweme/post/?{self.sec_uid}&' \ f'count=21&max_cursor={max_cursor}' resp = requests.get(json_url, headers=self.headers) video_has_more = resp.json()['has_more'] max_cursor = resp.json()['max_cursor'] video_list = resp.json()['aweme_list'] for i in video_list: try: all_video_list.append({'desc': i['desc'], 'vid': i['video']['vid'], 'aweme_id': i['aweme_id']}) except KeyError: all_video_list.append({'desc': i['desc'], 'vid': None, 'aweme_id': i['aweme_id']}) return all_video_list def down_video(self, down_info, index): alreday_down = False retry = 0 session = requests.session() session.mount('https://', HTTPAdapter(max_retries=5)) alreday_down_video = selet_data(self.uid) for n in alreday_down_video: if down_info['aweme_id'] in n: alreday_down = True break if alreday_down is False: if os.path.exists(self.nick_name) is False: try: os.makedirs(self.nick_name) except FileExistsError: pass if down_info['desc'] == '': down_info['desc'] = down_info['aweme_id'] down_info['desc'] = re.sub(r'[<|>\/:"*?\n]', '_', down_info['desc']) save_name = f'{self.nick_name}/{index.zfill(2)}_{down_info["desc"]}.mp4' if down_info["vid"]: download_url = f'https://aweme.snssdk.com/aweme/v1/play/?video_id={down_info["vid"]}&ratio=1080p' response = session.get(download_url, headers=self.headers, stream=True) while response.content == b'' and retry < 3: self.headers = { 'User-Agent': Faker().chrome() } response = session.get(download_url, headers=self.headers) retry += 1 if response.content: with open(save_name, 'wb') as file: file.write(response.content) insert_data(self.uid, down_info['aweme_id']) else: with open(f'{self.nick_name}/下载失败视频.txt', 'a+') as file: file.write(download_url + '\n') else: n = 0 img_json_url = f'https://www.douyin.com/web/api/v2/aweme/iteminfo/?item_ids={down_info["aweme_id"]}' if os.path.exists(f'{self.nick_name}/{down_info["desc"]}') is False: try: os.makedirs(f'{self.nick_name}/{down_info["desc"]}') except FileExistsError: pass response = session.get(img_json_url, headers=self.headers) for i in response.json()["item_list"][0]["images"]: n += 1 img_url = i["url_list"][0] img_content = session.get(img_url, headers=self.headers).content img_save_name = f'{self.nick_name}/{down_info["desc"]}/{n}.jpg' with open(img_save_name, 'wb') as file: file.write(img_content) def main(share_url, down_all=False): share_url = re.search(r'[a-zA-z]+://[^\s]*', share_url).group() douyin = Douyin(share_url) info = douyin.get_user_info() print(f'作者:{info["nickname"]}\n视频数:{info["aweme_count"]}\n{"-" * 20}\n拉取作者所有作品中...') down_list = douyin.get_all_video() down_mode = '1' if not down_all: down_mode = input(f'{"-" * 20}\n选择下载模式:\n1.全部下载\n2.关键词匹配下载\n') def down_task(down_load): with ThreadPoolExecutor(max_workers=10) as t: obj_list = [] for i in range((len(down_load))): obj = t.submit(douyin.down_video, down_load[i], str(i)) obj_list.append(obj) with tqdm(total=len(down_load), ncols=100) as bar: for x in as_completed(obj_list): bar.update(1) if down_mode == '1': if down_list: down_task(down_list) else: print(f'无视频可下载') elif down_mode == '2': k = 0 filter_down_list = [] keyword = input('请输入关键词:') for v in down_list: if keyword in v['desc']: filter_down_list.append(v) if len(filter_down_list) == 0: print('无匹配记录') else: print(f'共找到 {len(filter_down_list)} 条匹配记录, 开始下载') down_task(filter_down_list) else: print('输入错误') if __name__ == '__main__': share_url_list = linecache.getlines('作者主页链接.txt') if share_url_list: print('已检测到有批量下载文件,进入批量下载模式\n') if input('请选择批量下载模式:\n1.下载所有作者所有视频\n2.手动选择每个作者下载模式\n') == '1': down_all = True else: down_all = False for i in share_url_list: main(i, down_all) print(f'当前任务完成\n{"*" * 30}\n') input('所有任务已完成') else: url = input('输入作者主页分享链接:') main(url) input('任务已完成')
版权声明
版权说明: 仅限用于学习和研究目的;不得将上述内容用于商业和非法用途!否则一切后果自负。我们非常重视版权问题,如有侵权请邮件至(171373236#qq.com)与我们联系处理,敬请谅解!