使用python3拆分大文件txt 图文教程

最近网站被攻击,cdn一下被打2TB,然后下载了日志进行分析,但是日志文件有几十兆,所以需要做一下切割,这里我们记一下python3的拆分文件脚本,以备后用。

使用python3拆分大文件txt 图文教程

Python作为快速开发工具,其代码表达力强,开发效率高,因此用Python快速写一个,还是可行的。

python3代码脚本

import os
import sys
import random
import threading
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
import re
import time
lock = threading.Lock()
class TotalSizeCounter:
    def __init__(self):
        self.total_size = 0
        self.lock = threading.Lock()
    def add_size(self, size):
        with self.lock:
            self.total_size += size
    def get_total_size(self):
        with self.lock:
            return self.total_size
total_size_counter = TotalSizeCounter()
# 生成随机的User-Agent头部信息
def generate_user_agent():
    user_agents = [
        # iOS
        "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/84.0.4147.122 Mobile/15E148 Safari/604.1",
        "Mozilla/5.0 (iPad; CPU OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/84.0.4147.122 Mobile/15E148 Safari/604.1",
        
        # Android
        "Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Mobile Safari/537.36",
        "Mozilla/5.0 (Linux; Android 11; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Mobile Safari/537.36",
        
        # Windows
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/94.0.992.31",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/100.0",
        # macOS
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_6_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_6_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
        
        # Linux
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
        "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:95.0) Gecko/20100101 Firefox/95.0"
    ]
    return random.choice(user_agents)
def download_image(url, user_agent, output_folder):
    try:
        headers = {'User-Agent': user_agent}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        parsed_url = urlparse(url)
        filename = os.path.join(output_folder, os.path.basename(parsed_url.path))
        filename = f"{os.path.splitext(filename)[0]}_{random.randint(1, 10000)}{os.path.splitext(filename)[1]}"
        with lock:
            with open(filename, 'wb') as file:
                file.write(response.content)
                file_size = os.path.getsize(filename)
                total_size_counter.add_size(file_size)
                print(f"Downloaded image {url} as {filename}, Size: {file_size / (1024 * 1024):.2f} MB")
    except Exception as e:
        print(f"Error downloading image {url}: {e}")
def download_images(url, user_agent, output_folder):
    try:
        headers = {'User-Agent': user_agent}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        img_tags = soup.find_all('img')
        for img_tag in img_tags:
            img_url = img_tag.get('src')
            if img_url and not img_url.startswith(('data:', 'http:', 'https:')):
                img_url = urljoin(url, img_url)
                thread = threading.Thread(target=download_image, args=(img_url, user_agent, output_folder))
                thread.start()
                thread.join()
        img_urls_from_text = re.findall(r'<img[^>]*data-src=["\'](https?://[^"\']+\.(?:png|jpg|jpeg|gif|bmp))["\'][^>]*>', response.text)
        for img_url in img_urls_from_text:
            thread = threading.Thread(target=download_image, args=(img_url, user_agent, output_folder))
            thread.start()
            thread.join()
    except Exception as e:
        print(f"Error downloading images from {url}: {e}")
def main(url, num_iterations):
    start_time = time.time()  # 记录开始时间
    if not os.path.exists("files"):
        os.makedirs("files")
    threads = []
    for _ in range(num_iterations):
        user_agent = generate_user_agent()
        thread = threading.Thread(target=download_images, args=(url, user_agent, "files/"))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    end_time = time.time()  # 记录结束时间
    execution_time = end_time - start_time
    total_downloaded_size_mb = total_size_counter.get_total_size() / (1024 * 1024)
    print(f"Total downloaded size from all threads: {total_downloaded_size_mb:.2f} MB")
    print(f"Script execution time: {execution_time:.2f} seconds")
    # 删除"files"目录及其内容
    if os.path.exists("files"):
        for file_name in os.listdir("files"):
            file_path = os.path.join("files", file_name)
            os.remove(file_path)
        os.rmdir("files")
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: python script.py <url> <num_iterations>")
    else:
        url = sys.argv[1]
        num_iterations = int(sys.argv[2])
        main(url, num_iterations)

请在空目录汇总运行,脚本将自动当前目录创建files文件夹,脚本执行完成后删除所有下载的文件。

脚本将会计算所有下载的大小以及执行花费时间。

链接到文章: https://vpsum.com/49757.html

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注