First commit
This commit is contained in:
@@ -0,0 +1,333 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
import time
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.firefox.options import Options
|
||||
from selenium.webdriver.firefox.service import Service
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
from webdriver_manager.firefox import GeckoDriverManager
|
||||
|
||||
def setup_driver(headless=True):
|
||||
"""
|
||||
Set up Selenium WebDriver with Firefox (auto-installs GeckoDriver).
|
||||
|
||||
Args:
|
||||
headless: Run browser in headless mode (no visible window)
|
||||
|
||||
Returns:
|
||||
WebDriver instance
|
||||
"""
|
||||
firefox_options = Options()
|
||||
if headless:
|
||||
firefox_options.add_argument('--headless')
|
||||
firefox_options.set_preference('general.useragent.override', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0')
|
||||
|
||||
# Auto-install GeckoDriver
|
||||
service = Service(GeckoDriverManager().install())
|
||||
driver = webdriver.Firefox(service=service, options=firefox_options)
|
||||
return driver
|
||||
|
||||
def login_with_selenium(driver, email, password):
|
||||
"""
|
||||
Log in to shop.2000ad.com using Selenium.
|
||||
|
||||
Args:
|
||||
driver: Selenium WebDriver instance
|
||||
email: Your email address
|
||||
password: Your password
|
||||
|
||||
Returns:
|
||||
True if login successful, False otherwise
|
||||
"""
|
||||
login_url = "https://shop.2000ad.com/account/sign-in"
|
||||
|
||||
print("🔐 Navigating to login page...")
|
||||
driver.get(login_url)
|
||||
|
||||
try:
|
||||
# Wait for and fill in email field
|
||||
email_field = WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.NAME, "email"))
|
||||
)
|
||||
email_field.send_keys(email)
|
||||
|
||||
# Fill in password field
|
||||
password_field = driver.find_element(By.NAME, "password")
|
||||
password_field.send_keys(password)
|
||||
|
||||
# Submit the form
|
||||
print("🔐 Logging in...")
|
||||
password_field.submit()
|
||||
|
||||
# Wait for redirect after login
|
||||
time.sleep(3)
|
||||
|
||||
# Check if login was successful
|
||||
if 'login' not in driver.current_url.lower():
|
||||
print("✅ Login successful!")
|
||||
return True
|
||||
else:
|
||||
print("❌ Login failed - check your credentials")
|
||||
return False
|
||||
|
||||
except TimeoutException:
|
||||
print("❌ Login form not found - page may have changed")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"❌ Login error: {e}")
|
||||
return False
|
||||
|
||||
def scroll_and_load_all_items(driver, downloads_url):
|
||||
"""
|
||||
Navigate to downloads page and scroll to load all items.
|
||||
|
||||
Args:
|
||||
driver: Selenium WebDriver instance
|
||||
downloads_url: URL of the downloads page
|
||||
|
||||
Returns:
|
||||
HTML content with all items loaded
|
||||
"""
|
||||
print(f"\n📄 Loading downloads page: {downloads_url}")
|
||||
driver.get(downloads_url)
|
||||
|
||||
# Wait for initial content to load
|
||||
try:
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "product"))
|
||||
)
|
||||
except TimeoutException:
|
||||
print("⚠️ Warning: No products found on page")
|
||||
return driver.page_source
|
||||
|
||||
print("📜 Scrolling to load all items...")
|
||||
|
||||
last_height = driver.execute_script("return document.body.scrollHeight")
|
||||
items_count = 0
|
||||
no_change_count = 0
|
||||
|
||||
while True:
|
||||
# Count current items
|
||||
current_items = len(driver.find_elements(By.CLASS_NAME, "product"))
|
||||
|
||||
if current_items != items_count:
|
||||
print(f" Loaded {current_items} items so far...")
|
||||
items_count = current_items
|
||||
no_change_count = 0
|
||||
else:
|
||||
no_change_count += 1
|
||||
|
||||
# Scroll to bottom
|
||||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
||||
|
||||
# Wait for new content to load
|
||||
time.sleep(2)
|
||||
|
||||
# Calculate new scroll height
|
||||
new_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
# Check if we've stopped loading new content
|
||||
if new_height == last_height and no_change_count >= 3:
|
||||
print(f"✅ Finished loading - found {items_count} total items")
|
||||
break
|
||||
|
||||
last_height = new_height
|
||||
|
||||
# Safety limit to prevent infinite loops
|
||||
if no_change_count >= 10:
|
||||
print(f"⚠️ Stopped scrolling after no changes - found {items_count} items")
|
||||
break
|
||||
|
||||
return driver.page_source
|
||||
|
||||
def transfer_cookies_to_requests(driver, session):
|
||||
"""
|
||||
Transfer cookies from Selenium to requests Session.
|
||||
|
||||
Args:
|
||||
driver: Selenium WebDriver instance
|
||||
session: requests.Session instance
|
||||
"""
|
||||
selenium_cookies = driver.get_cookies()
|
||||
for cookie in selenium_cookies:
|
||||
session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
|
||||
print(f"✅ Transferred {len(selenium_cookies)} cookies to requests session")
|
||||
|
||||
def download_cbz_files(email, password, output_dir='downloads', headless=True):
|
||||
"""
|
||||
Log in, load all downloads, and download all CBZ files.
|
||||
|
||||
Args:
|
||||
email: Your shop.2000ad.com email
|
||||
password: Your shop.2000ad.com password
|
||||
output_dir: Directory where files will be saved (default: 'downloads')
|
||||
headless: Run browser in headless mode (default: True)
|
||||
"""
|
||||
|
||||
# Create output directory if it doesn't exist
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Set up Selenium
|
||||
print("🌐 Starting browser...")
|
||||
driver = setup_driver(headless=headless)
|
||||
|
||||
try:
|
||||
# Log in
|
||||
if not login_with_selenium(driver, email, password):
|
||||
print("\n❌ Cannot proceed without successful login")
|
||||
return
|
||||
|
||||
# Load downloads page with infinite scroll
|
||||
downloads_url = "https://shop.2000ad.com/account/downloads"
|
||||
html_content = scroll_and_load_all_items(driver, downloads_url)
|
||||
|
||||
# Parse the HTML
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
products = soup.find_all('li', class_='product')
|
||||
|
||||
print(f"\n📚 Found {len(products)} products to process\n")
|
||||
|
||||
# Create requests session and transfer cookies
|
||||
session = requests.Session()
|
||||
session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
'Referer': 'https://shop.2000ad.com/'
|
||||
})
|
||||
|
||||
transfer_cookies_to_requests(driver, session)
|
||||
|
||||
# Close browser - we don't need it anymore
|
||||
driver.quit()
|
||||
print("✅ Browser closed\n")
|
||||
|
||||
print(f"{'='*50}")
|
||||
print("Starting downloads...\n")
|
||||
|
||||
downloaded = 0
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
for product in products:
|
||||
# Get product name for better logging
|
||||
product_name = product.get('data-name', 'Unknown')
|
||||
|
||||
# Get publication date
|
||||
release_date = product.get('data-released', '')
|
||||
date_str = ''
|
||||
if release_date:
|
||||
# Format: YYYYMMDDHHMMSS -> YYYY-MM-DD
|
||||
try:
|
||||
date_str = f"{release_date[0:4]}-{release_date[4:6]}-{release_date[6:8]}"
|
||||
except:
|
||||
date_str = ''
|
||||
|
||||
# Determine subdirectory based on product name
|
||||
if 'megazine' in product_name.lower():
|
||||
product_output_dir = os.path.join(output_dir, 'Megazine')
|
||||
else:
|
||||
product_output_dir = os.path.join(output_dir, '2000ad')
|
||||
|
||||
# Create subdirectory if it doesn't exist
|
||||
os.makedirs(product_output_dir, exist_ok=True)
|
||||
|
||||
# Find all forms within this product
|
||||
forms = product.find_all('form')
|
||||
|
||||
for form in forms:
|
||||
# Check if this form is for a CBZ download
|
||||
button = form.find('button', type='submit')
|
||||
if button and 'CBZ' in button.get_text():
|
||||
# Get the download URL
|
||||
download_url = form.get('action')
|
||||
|
||||
if download_url:
|
||||
# Create a safe filename with date
|
||||
if date_str:
|
||||
filename = f"{date_str} - {product_name}.cbz"
|
||||
else:
|
||||
filename = f"{product_name}.cbz"
|
||||
|
||||
filename = filename.replace('/', '-').replace('\\', '-').replace(':', '-')
|
||||
filepath = os.path.join(product_output_dir, filename)
|
||||
|
||||
# Check if file already exists
|
||||
if os.path.exists(filepath):
|
||||
subdir = 'Megazine' if 'megazine' in product_name.lower() else '2000ad'
|
||||
print(f"⏭️ Skipping (already exists): {subdir}/{filename}")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
subdir = 'Megazine' if 'megazine' in product_name.lower() else '2000ad'
|
||||
print(f"📥 Downloading to {subdir}/: {filename}")
|
||||
|
||||
# Download the file
|
||||
response = session.get(download_url, stream=True, allow_redirects=True)
|
||||
|
||||
# Check if we got HTML (login page) instead of CBZ
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if 'text/html' in content_type:
|
||||
print(f"⚠️ Warning: Got HTML response instead of file")
|
||||
print(f" This might be a permission issue or the file isn't available")
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
# Save the file
|
||||
with open(filepath, 'wb') as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
file_size = os.path.getsize(filepath)
|
||||
print(f"✅ Saved to {subdir}/: {filename} ({file_size / 1024 / 1024:.2f} MB)")
|
||||
downloaded += 1
|
||||
|
||||
# Be polite - add a small delay between downloads
|
||||
time.sleep(1)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"❌ Error downloading {filename}: {e}")
|
||||
failed += 1
|
||||
except Exception as e:
|
||||
print(f"❌ Error saving {filename}: {e}")
|
||||
failed += 1
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Download complete!")
|
||||
print(f"✅ Successfully downloaded: {downloaded}")
|
||||
print(f"⏭️ Skipped (already exist): {skipped}")
|
||||
print(f"❌ Failed: {failed}")
|
||||
print(f"📁 Files saved to: {os.path.abspath(output_dir)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Fatal error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
# Make sure browser is closed
|
||||
try:
|
||||
driver.quit()
|
||||
except:
|
||||
pass
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Configuration - can be set via environment variables or directly
|
||||
email = os.environ.get('EMAIL', 'your_email@example.com')
|
||||
password = os.environ.get('PASSWORD', 'your_password')
|
||||
output_dir = os.environ.get('OUTPUT_DIR', 'downloads')
|
||||
|
||||
if email == 'your_email@example.com' or password == 'your_password':
|
||||
print("⚠️ Warning: Please set EMAIL and PASSWORD environment variables")
|
||||
print(" Example: EMAIL=your@email.com PASSWORD=yourpass python download_cbz.py")
|
||||
exit(1)
|
||||
|
||||
# Run the download
|
||||
# Set headless=False if you want to see the browser window
|
||||
download_cbz_files(email, password, output_dir=output_dir, headless=True)
|
||||
Reference in New Issue
Block a user