commit 30c7a77
src
├─art.py
├─cache.py
├─classes
│ ├─AFM.py
│ ├─Outreach.py
│ ├─Tts.py
│ ├─Twitter.py
│ └─YouTube.py
├─config.py
├─constants.py
├─cron.py
├─main.py
├─status.py
└─utils.py
from config import ROOT_DIR
from termcolor import colored
def print_banner() -> None:
"""
Prints the introductory ASCII Art Banner.
Returns:
None
"""
with open(f"{ROOT_DIR}/assets/banner.txt", "r") as file:
print(colored(file.read(), "green"))
import os
import json
from typing import List
from config import ROOT_DIR
def get_cache_path() -> str:
"""
Gets the path to the cache file.
Returns:
path (str): The path to the cache folder
"""
return os.path.join(ROOT_DIR, '.mp')
def get_afm_cache_path() -> str:
"""
Gets the path to the Affiliate Marketing cache file.
Returns:
path (str): The path to the AFM cache folder
"""
return os.path.join(get_cache_path(), 'afm.json')
def get_twitter_cache_path() -> str:
"""
Gets the path to the Twitter cache file.
Returns:
path (str): The path to the Twitter cache folder
"""
return os.path.join(get_cache_path(), 'twitter.json')
def get_youtube_cache_path() -> str:
"""
Gets the path to the YouTube cache file.
Returns:
path (str): The path to the YouTube cache folder
"""
return os.path.join(get_cache_path(), 'youtube.json')
def get_accounts(provider: str) -> List[dict]:
"""
Gets the accounts from the cache.
Args:
provider (str): The provider to get the accounts for
Returns:
account (List[dict]): The accounts
"""
cache_path = ""
if provider == "twitter":
cache_path = get_twitter_cache_path()
elif provider == "youtube":
cache_path = get_youtube_cache_path()
if not os.path.exists(cache_path):
# Create the cache file
with open(cache_path, 'w') as file:
json.dump({
"accounts": []
}, file, indent=4)
with open(cache_path, 'r') as file:
parsed = json.load(file)
if parsed is None:
return []
if 'accounts' not in parsed:
return []
# Get accounts dictionary
return parsed['accounts']
def add_account(provider: str, account: dict) -> None:
"""
Adds an account to the cache.
Args:
account (dict): The account to add
Returns:
None
"""
if provider == "twitter":
# Get the current accounts
accounts = get_accounts("twitter")
# Add the new account
accounts.append(account)
# Write the new accounts to the cache
with open(get_twitter_cache_path(), 'w') as file:
json.dump({
"accounts": accounts
}, file, indent=4)
elif provider == "youtube":
# Get the current accounts
accounts = get_accounts("youtube")
# Add the new account
accounts.append(account)
# Write the new accounts to the cache
with open(get_youtube_cache_path(), 'w') as file:
json.dump({
"accounts": accounts
}, file, indent=4)
def remove_account(account_id: str) -> None:
"""
Removes an account from the cache.
Args:
account_id (str): The ID of the account to remove
Returns:
None
"""
# Get the current accounts
accounts = get_accounts()
# Remove the account
accounts = [account for account in accounts if account['id'] != account_id]
# Write the new accounts to the cache
with open(get_twitter_cache_path(), 'w') as file:
json.dump({
"accounts": accounts
}, file, indent=4)
def get_products() -> List[dict]:
"""
Gets the products from the cache.
Returns:
products (List[dict]): The products
"""
if not os.path.exists(get_afm_cache_path()):
# Create the cache file
with open(get_afm_cache_path(), 'w') as file:
json.dump({
"products": []
}, file, indent=4)
with open(get_afm_cache_path(), 'r') as file:
parsed = json.load(file)
# Get the products
return parsed["products"]
def add_product(product: dict) -> None:
"""
Adds a product to the cache.
Args:
product (dict): The product to add
Returns:
None
"""
# Get the current products
products = get_products()
# Add the new product
products.append(product)
# Write the new products to the cache
with open(get_afm_cache_path(), 'w') as file:
json.dump({
"products": products
}, file, indent=4)
def get_results_cache_path() -> str:
"""
Gets the path to the results cache file.
Returns:
path (str): The path to the results cache folder
"""
return os.path.join(get_cache_path(), 'scraper_results.csv')
import g4f
from status import *
from config import *
from constants import *
from .Twitter import Twitter
from selenium_firefox import *
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from webdriver_manager.firefox import GeckoDriverManager
class AffiliateMarketing:
"""
This class will be used to handle all the affiliate marketing related operations.
"""
def __init__(self, affiliate_link: str, fp_profile_path: str, twitter_account_uuid: str, account_nickname: str, topic: str) -> None:
"""
Initializes the Affiliate Marketing class.
Args:
affiliate_link (str): The affiliate link
fp_profile_path (str): The path to the Firefox profile
twitter_account_uuid (str): The Twitter account UUID
account_nickname (str): The account nickname
topic (str): The topic of the product
Returns:
None
"""
self._fp_profile_path: str = fp_profile_path
# Initialize the Firefox profile
self.options: Options = Options()
# Set headless state of browser
if get_headless():
self.options.add_argument("--headless")
# Set the profile path
self.options.add_argument("-profile")
self.options.add_argument(fp_profile_path)
# Set the service
self.service: Service = Service(GeckoDriverManager().install())
# Initialize the browser
self.browser: webdriver.Firefox = webdriver.Firefox(service=self.service, options=self.options)
# Set the affiliate link
self.affiliate_link: str = affiliate_link
# Set the Twitter account UUID
self.account_uuid: str = twitter_account_uuid
# Set the Twitter account nickname
self.account_nickname: str = account_nickname
# Set the Twitter topic
self.topic: str = topic
# Scrape the product information
self.scrape_product_information()
def scrape_product_information(self) -> None:
"""
This method will be used to scrape the product
information from the affiliate link.
"""
# Open the affiliate link
self.browser.get(self.affiliate_link)
# Get the product name
product_title: str = self.browser.find_element(By.ID, AMAZON_PRODUCT_TITLE_ID).text
# Get the features of the product
features: any = self.browser.find_elements(By.ID, AMAZON_FEATURE_BULLETS_ID)
if get_verbose():
info(f"Product Title: {product_title}")
if get_verbose():
info(f"Features: {features}")
# Set the product title
self.product_title: str = product_title
# Set the features
self.features: any = features
def generate_response(self, prompt: str) -> str:
"""
This method will be used to generate the response for the user.
Args:
prompt (str): The prompt for the user.
Returns:
response (str): The response for the user.
"""
# Generate the response
response: str = g4f.ChatCompletion.create(
model=parse_model(get_model()),
messages=[
{
"role": "user",
"content": prompt
}
]
)
# Return the response
return response
def generate_pitch(self) -> str:
"""
This method will be used to generate a pitch for the product.
Returns:
pitch (str): The pitch for the product.
"""
# Generate the response
pitch: str = self.generate_response(f"I want to promote this product on my website. Generate a brief pitch about this product, return nothing else except the pitch. Information:\nTitle: \"{self.product_title}\"\nFeatures: \"{str(self.features)}\"") + "\nYou can buy the product here: " + self.affiliate_link
self.pitch: str = pitch
# Return the response
return pitch
def share_pitch(self, where: str) -> None:
"""
This method will be used to share the pitch on the specified platform.
Args:
where (str): The platform where the pitch will be shared.
"""
if where == "twitter":
# Initialize the Twitter class
twitter: Twitter = Twitter(self.account_uuid, self.account_nickname, self._fp_profile_path, self.topic)
# Share the pitch
twitter.post(self.pitch)
def quit(self) -> None:
"""
This method will be used to quit the browser.
"""
# Quit the browser
self.browser.quit()
import os
import io
import re
import csv
import time
import zipfile
import yagmail
import requests
import subprocess
from cache import *
from status import *
from config import *
class Outreach:
"""
Class that houses the methods to reach out to businesses.
"""
def __init__(self) -> None:
"""
Constructor for the Outreach class.
Returns:
None
"""
# Check if go is installed
self.go_installed = os.system("go version") == 0
# Set niche
self.niche = get_google_maps_scraper_niche()
# Set email credentials
self.email_creds = get_email_credentials()
def is_go_installed(self) -> bool:
"""
Check if go is installed.
Returns:
bool: True if go is installed, False otherwise.
"""
# Check if go is installed
try:
subprocess.call("go version", shell=True)
return True
except Exception as e:
return False
def unzip_file(self, zip_link: str) -> None:
"""
Unzip the file.
Args:
zip_link (str): The link to the zip file.
Returns:
None
"""
# Check if the scraper is already unzipped, if not, unzip it
if os.path.exists("google-maps-scraper-0.9.7"):
info("=> Scraper already unzipped. Skipping unzip.")
return
r = requests.get(zip_link)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall()
def build_scraper(self) -> None:
"""
Build the scraper.
Returns:
None
"""
# Check if the scraper is already built, if not, build it
if os.path.exists("google-maps-scraper.exe"):
print(colored("=> Scraper already built. Skipping build.", "blue"))
return
os.chdir("google-maps-scraper-0.9.7")
os.system("go mod download")
os.system("go build")
os.system("mv google-maps-scraper ../google-maps-scraper")
os.chdir("..")
def run_scraper_with_args_for_30_seconds(self, args: str, timeout = 300) -> None:
"""
Run the scraper with the specified arguments for 30 seconds.
Args:
args (str): The arguments to run the scraper with.
timeout (int): The time to run the scraper for.
Returns:
None
"""
# Run the scraper with the specified arguments
info(" => Running scraper...")
command = "google-maps-scraper " + args
try:
scraper_process = subprocess.call(command.split(" "), shell=True, timeout=float(timeout))
if scraper_process == 0:
subprocess.call("taskkill /f /im google-maps-scraper.exe", shell=True)
print(colored("=> Scraper finished successfully.", "green"))
else:
subprocess.call("taskkill /f /im google-maps-scraper.exe", shell=True)
print(colored("=> Scraper finished with an error.", "red"))
except Exception as e:
subprocess.call("taskkill /f /im google-maps-scraper.exe", shell=True)
print(colored("An error occurred while running the scraper:", "red"))
print(str(e))
def get_items_from_file(self, file_name: str) -> list:
"""
Read and return items from a file.
Args:
file_name (str): The name of the file to read from.
Returns:
list: The items from the file.
"""
# Read and return items from a file
with open(file_name, "r", errors="ignore") as f:
items = f.readlines()
items = [item.strip() for item in items[1:]]
return items
def set_email_for_website(self, index: int, website: str, output_file: str):
"""Extracts an email address from a website and updates a CSV file with it.
This method sends a GET request to the specified website, searches for the
first email address in the HTML content, and appends it to the specified
row in a CSV file. If no email address is found, no changes are made to
the CSV file.
Args:
index (int): The row index in the CSV file where the email should be appended.
website (str): The URL of the website to extract the email address from.
output_file (str): The path to the CSV file to update with the extracted email."""
# Extract and set an email for a website
email = ""
r = requests.get(website)
if r.status_code == 200:
# Define a regular expression pattern to match email addresses
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b"
# Find all email addresses in the HTML string
email_addresses = re.findall(email_pattern, r.text)
email = email_addresses[0] if len(email_addresses) > 0 else ""
if email:
print(f"=> Setting email {email} for website {website}")
with open(output_file, "r", newline="", errors="ignore") as csvfile:
csvreader = csv.reader(csvfile)
items = list(csvreader)
items[index].append(email)
with open(output_file, "w", newline="", errors="ignore") as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerows(items)
def start(self) -> None:
"""
Start the outreach process.
Returns:
None
"""
# Check if go is installed
if not self.is_go_installed():
error("Go is not installed. Please install go and try again.")
return
# Unzip the scraper
self.unzip_file(get_google_maps_scraper_zip_url())
# Build the scraper
self.build_scraper()
# Write the niche to a file
with open("niche.txt", "w") as f:
f.write(self.niche)
output_path = get_results_cache_path()
message_subject = get_outreach_message_subject()
message_body = get_outreach_message_body_file()
# Run
self.run_scraper_with_args_for_30_seconds(f"-input niche.txt -results \"{output_path}\"", timeout=get_scraper_timeout())
# Get the items from the file
items = self.get_items_from_file(output_path)
success(f" => Scraped {len(items)} items.")
# Remove the niche file
os.remove("niche.txt")
time.sleep(2)
# Create a yagmail SMTP client outside the loop
yag = yagmail.SMTP(user=self.email_creds["username"], password=self.email_creds["password"], host=self.email_creds["smtp_server"], port=self.email_creds["smtp_port"])
# Get the email for each business
for item in items:
try:
# Check if the item"s website is valid
website = item.split(",")
website = [w for w in website if w.startswith("http")]
website = website[0] if len(website) > 0 else ""
if website != "":
test_r = requests.get(website)
if test_r.status_code == 200:
self.set_email_for_website(items.index(item), website, output_path)
# Send emails using the existing SMTP connection
receiver_email = item.split(",")[-1]
if "@" not in receiver_email:
warning(f" => No email provided. Skipping...")
continue
subject = message_subject.replace("{{COMPANY_NAME}}", item[0])
body = open(message_body, "r").read().replace("{{COMPANY_NAME}}", item[0])
info(f" => Sending email to {receiver_email}...")
yag.send(
to=receiver_email,
subject=subject,
contents=body,
)
success(f" => Sent email to {receiver_email}")
else:
warning(f" => Website {website} is invalid. Skipping...")
except Exception as err:
error(f" => Error: {err}...")
continue
import os
import sys
import site
from config import ROOT_DIR
from TTS.utils.manage import ModelManager
from TTS.utils.synthesizer import Synthesizer
class TTS:
"""
Class for Text-to-Speech using Coqui TTS.
"""
def __init__(self) -> None:
"""
Initializes the TTS class.
Returns:
None
"""
# Detect virtual environment site packages
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
# We're in a virtual environment
site_packages = site.getsitepackages()[0]
else:
# We're not in a virtual environment, use the user's site packages
site_packages = site.getusersitepackages()
# Path to the .models.json file
models_json_path = os.path.join(
site_packages,
"TTS",
".models.json",
)
# Create directory if it doesn't exist
tts_dir = os.path.dirname(models_json_path)
if not os.path.exists(tts_dir):
os.makedirs(tts_dir)
# Initialize the ModelManager
self._model_manager = ModelManager(models_json_path)
# Download tts_models/en/ljspeech/fast_pitch
self._model_path, self._config_path, self._model_item = \
self._model_manager.download_model("tts_models/en/ljspeech/tacotron2-DDC_ph")
# Download vocoder_models/en/ljspeech/hifigan_v2 as our vocoder
voc_path, voc_config_path, _ = self._model_manager. \
download_model("vocoder_models/en/ljspeech/univnet")
# Initialize the Synthesizer
self._synthesizer = Synthesizer(
tts_checkpoint=self._model_path,
tts_config_path=self._config_path,
vocoder_checkpoint=voc_path,
vocoder_config=voc_config_path
)
@property
def synthesizer(self) -> Synthesizer:
"""
Returns the synthesizer.
Returns:
Synthesizer: The synthesizer.
"""
return self._synthesizer
def synthesize(self, text: str, output_file: str = os.path.join(ROOT_DIR, ".mp", "audio.wav")) -> str:
"""
Synthesizes the given text into speech.
Args:
text (str): The text to synthesize.
output_file (str, optional): The output file to save the synthesized speech. Defaults to "audio.wav".
Returns:
str: The path to the output file.
"""
# Synthesize the text
outputs = self.synthesizer.tts(text)
# Save the synthesized speech to the output file
self.synthesizer.save_wav(outputs, output_file)
return output_file
import re
import g4f
import sys
import time
from cache import *
from config import *
from status import *
from constants import *
from typing import List
from datetime import datetime
from termcolor import colored
from selenium_firefox import *
from selenium import webdriver
from selenium.common import exceptions
from selenium.webdriver.common import keys
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from webdriver_manager.firefox import GeckoDriverManager
class Twitter:
"""
Class for the Bot, that grows a Twitter account.
"""
def __init__(self, account_uuid: str, account_nickname: str, fp_profile_path: str, topic: str) -> None:
"""
Initializes the Twitter Bot.
Args:
account_uuid (str): The account UUID
account_nickname (str): The account nickname
fp_profile_path (str): The path to the Firefox profile
Returns:
None
"""
self.account_uuid: str = account_uuid
self.account_nickname: str = account_nickname
self.fp_profile_path: str = fp_profile_path
self.topic: str = topic
# Initialize the Firefox profile
self.options: Options = Options()
# Set headless state of browser
if get_headless():
self.options.add_argument("--headless")
# Set the profile path
self.options.add_argument("-profile")
self.options.add_argument(fp_profile_path)
# Set the service
self.service: Service = Service(GeckoDriverManager().install())
# Initialize the browser
self.browser: webdriver.Firefox = webdriver.Firefox(service=self.service, options=self.options)
def post(self, text: str = None) -> None:
"""
Starts the Twitter Bot.
Args:
text (str): The text to post
Returns:
None
"""
bot: webdriver.Firefox = self.browser
verbose: bool = get_verbose()
bot.get("https://twitter.com")
time.sleep(2)
post_content: str = self.generate_post()
now: datetime = datetime.now()
print(colored(f" => Posting to Twitter:", "blue"), post_content[:30] + "...")
try:
bot.find_element(By.XPATH, "//a[@data-testid='SideNav_NewTweet_Button']").click()
except exceptions.NoSuchElementException:
time.sleep(3)
bot.find_element(By.XPATH, "//a[@data-testid='SideNav_NewTweet_Button']").click()
time.sleep(2)
body = post_content if text is None else text
try:
bot.find_element(By.XPATH, "//div[@role='textbox']").send_keys(body)
except exceptions.NoSuchElementException:
time.sleep(2)
bot.find_element(By.XPATH, "//div[@role='textbox']").send_keys(body)
time.sleep(1)
bot.find_element(By.CLASS_NAME, "notranslate").send_keys(keys.Keys.ENTER)
bot.find_element(By.XPATH, "//div[@data-testid='tweetButton']").click()
if verbose:
print(colored(" => Pressed [ENTER] Button on Twitter..", "blue"))
time.sleep(4)
# Add the post to the cache
self.add_post({
"content": post_content,
"date": now.strftime("%m/%d/%Y, %H:%M:%S")
})
success("Posted to Twitter successfully!")
def get_posts(self) -> List[dict]:
"""
Gets the posts from the cache.
Returns:
posts (List[dict]): The posts
"""
if not os.path.exists(get_twitter_cache_path()):
# Create the cache file
with open(get_twitter_cache_path(), 'w') as file:
json.dump({
"posts": []
}, file, indent=4)
with open(get_twitter_cache_path(), 'r') as file:
parsed = json.load(file)
# Find our account
accounts = parsed["accounts"]
for account in accounts:
if account["id"] == self.account_uuid:
posts = account["posts"]
if posts is None:
return []
# Return the posts
return posts
def add_post(self, post: dict) -> None:
"""
Adds a post to the cache.
Args:
post (dict): The post to add
Returns:
None
"""
posts = self.get_posts()
posts.append(post)
with open(get_twitter_cache_path(), "r") as file:
previous_json = json.loads(file.read())
# Find our account
accounts = previous_json["accounts"]
for account in accounts:
if account["id"] == self.account_uuid:
account["posts"].append(post)
# Commit changes
with open(get_twitter_cache_path(), "w") as f:
f.write(json.dumps(previous_json))
def generate_post(self) -> str:
"""
Generates a post for the Twitter account based on the topic.
Returns:
post (str): The post
"""
completion = g4f.ChatCompletion.create(
model=parse_model(get_model()),
messages=[
{
"role": "user",
"content": f"Generate a Twitter post about: {self.topic} in {get_twitter_language()}. The Limit is 2 sentences. Choose a specific sub-topic of the provided topic."
}
]
)
if get_verbose():
info("Generating a post...")
if completion is None:
error("Failed to generate a post. Please try again.")
sys.exit(1)
# Apply Regex to remove all *
completion = re.sub(r"\*", "", completion).replace("\"", "")
if get_verbose():
info(f"Length of post: {len(completion)}")
if len(completion) >= 260:
return self.generate_post()
return completion
import re
import g4f
import json
import time
import requests
import assemblyai as aai
from utils import *
from cache import *
from .Tts import TTS
from config import *
from status import *
from uuid import uuid4
from constants import *
from typing import List
from moviepy.editor import *
from termcolor import colored
from selenium_firefox import *
from selenium import webdriver
from moviepy.video.fx.all import crop
from moviepy.config import change_settings
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from moviepy.video.tools.subtitles import SubtitlesClip
from webdriver_manager.firefox import GeckoDriverManager
from datetime import datetime
# Set ImageMagick Path
change_settings({"IMAGEMAGICK_BINARY": get_imagemagick_path()})
class YouTube:
"""
Class for YouTube Automation.
Steps to create a YouTube Short:
1. Generate a topic [DONE]
2. Generate a script [DONE]
3. Generate metadata (Title, Description, Tags) [DONE]
4. Generate AI Image Prompts [DONE]
4. Generate Images based on generated Prompts [DONE]
5. Convert Text-to-Speech [DONE]
6. Show images each for n seconds, n: Duration of TTS / Amount of images [DONE]
7. Combine Concatenated Images with the Text-to-Speech [DONE]
"""
def __init__(self, account_uuid: str, account_nickname: str, fp_profile_path: str, niche: str, language: str) -> None:
"""
Constructor for YouTube Class.
Args:
account_uuid (str): The unique identifier for the YouTube account.
account_nickname (str): The nickname for the YouTube account.
fp_profile_path (str): Path to the firefox profile that is logged into the specificed YouTube Account.
niche (str): The niche of the provided YouTube Channel.
language (str): The language of the Automation.
Returns:
None
"""
self._account_uuid: str = account_uuid
self._account_nickname: str = account_nickname
self._fp_profile_path: str = fp_profile_path
self._niche: str = niche
self._language: str = language
self.images = []
# Initialize the Firefox profile
self.options: Options = Options()
# Set headless state of browser
if get_headless():
self.options.add_argument("--headless")
profile = webdriver.FirefoxProfile(self._fp_profile_path)
self.options.profile = profile
# Set the service
self.service: Service = Service(GeckoDriverManager().install())
# Initialize the browser
self.browser: webdriver.Firefox = webdriver.Firefox(service=self.service, options=self.options)
@property
def niche(self) -> str:
"""
Getter Method for the niche.
Returns:
niche (str): The niche
"""
return self._niche
@property
def language(self) -> str:
"""
Getter Method for the language to use.
Returns:
language (str): The language
"""
return self._language
def generate_response(self, prompt: str, model: any = None) -> str:
"""
Generates an LLM Response based on a prompt and the user-provided model.
Args:
prompt (str): The prompt to use in the text generation.
Returns:
response (str): The generated AI Repsonse.
"""
if not model:
return g4f.ChatCompletion.create(
model=parse_model(get_model()),
messages=[
{
"role": "user",
"content": prompt
}
]
)
else:
return g4f.ChatCompletion.create(
model=model,
messages=[
{
"role": "user",
"content": prompt
}
]
)
def generate_topic(self) -> str:
"""
Generates a topic based on the YouTube Channel niche.
Returns:
topic (str): The generated topic.
"""
completion = self.generate_response(f"Please generate a specific video idea that takes about the following topic: {self.niche}. Make it exactly one sentence. Only return the topic, nothing else.")
if not completion:
error("Failed to generate Topic.")
self.subject = completion
return completion
def generate_script(self) -> str:
"""
Generate a script for a video, depending on the subject of the video, the number of paragraphs, and the AI model.
Returns:
script (str): The script of the video.
"""
prompt = f"""
Generate a script for a video in 4 sentences, depending on the subject of the video.
The script is to be returned as a string with the specified number of paragraphs.
Here is an example of a string:
"This is an example string."
Do not under any circumstance reference this prompt in your response.
Get straight to the point, don't start with unnecessary things like, "welcome to this video".
Obviously, the script should be related to the subject of the video.
YOU MUST NOT EXCEED THE 4 SENTENCES LIMIT. MAKE SURE THE 4 SENTENCES ARE SHORT.
YOU MUST NOT INCLUDE ANY TYPE OF MARKDOWN OR FORMATTING IN THE SCRIPT, NEVER USE A TITLE.
YOU MUST WRITE THE SCRIPT IN THE LANGUAGE SPECIFIED IN [LANGUAGE].
ONLY RETURN THE RAW CONTENT OF THE SCRIPT. DO NOT INCLUDE "VOICEOVER", "NARRATOR" OR SIMILAR INDICATORS OF WHAT SHOULD BE SPOKEN AT THE BEGINNING OF EACH PARAGRAPH OR LINE. YOU MUST NOT MENTION THE PROMPT, OR ANYTHING ABOUT THE SCRIPT ITSELF. ALSO, NEVER TALK ABOUT THE AMOUNT OF PARAGRAPHS OR LINES. JUST WRITE THE SCRIPT
Subject: {self.subject}
Language: {self.language}
"""
completion = self.generate_response(prompt)
# Apply regex to remove *
completion = re.sub(r"\*", "", completion)
if not completion:
error("The generated script is empty.")
return
if len(completion) > 5000:
if get_verbose():
warning("Generated Script is too long. Retrying...")
self.generate_script()
self.script = completion
return completion
def generate_metadata(self) -> dict:
"""
Generates Video metadata for the to-be-uploaded YouTube Short (Title, Description).
Returns:
metadata (dict): The generated metadata.
"""
title = self.generate_response(f"Please generate a YouTube Video Title for the following subject, including hashtags: {self.subject}. Only return the title, nothing else. Limit the title under 100 characters.")
if len(title) > 100:
if get_verbose():
warning("Generated Title is too long. Retrying...")
return self.generate_metadata()
description = self.generate_response(f"Please generate a YouTube Video Description for the following script: {self.script}. Only return the description, nothing else.")
self.metadata = {
"title": title,
"description": description
}
return self.metadata
def generate_prompts(self) -> List[str]:
"""
Generates AI Image Prompts based on the provided Video Script.
Returns:
image_prompts (List[str]): Generated List of image prompts.
"""
# Check if using G4F for image generation
cached_accounts = get_accounts("youtube")
account_config = None
for account in cached_accounts:
if account["id"] == self._account_uuid:
account_config = account
break
# Calculate number of prompts based on script length
base_n_prompts = len(self.script) / 3
# If using G4F, limit to 25 prompts
if account_config and account_config.get("use_g4f", False):
n_prompts = min(base_n_prompts, 25)
else:
n_prompts = base_n_prompts
prompt = f"""
Generate {n_prompts} Image Prompts for AI Image Generation,
depending on the subject of a video.
Subject: {self.subject}
The image prompts are to be returned as
a JSON-Array of strings.
Each search term should consist of a full sentence,
always add the main subject of the video.
Be emotional and use interesting adjectives to make the
Image Prompt as detailed as possible.
YOU MUST ONLY RETURN THE JSON-ARRAY OF STRINGS.
YOU MUST NOT RETURN ANYTHING ELSE.
YOU MUST NOT RETURN THE SCRIPT.
The search terms must be related to the subject of the video.
Here is an example of a JSON-Array of strings:
["image prompt 1", "image prompt 2", "image prompt 3"]
For context, here is the full text:
{self.script}
"""
completion = str(self.generate_response(prompt, model=parse_model(get_image_prompt_llm())))\
.replace("```json", "") \
.replace("```", "")
image_prompts = []
if "image_prompts" in completion:
image_prompts = json.loads(completion)["image_prompts"]
else:
try:
image_prompts = json.loads(completion)
if get_verbose():
info(f" => Generated Image Prompts: {image_prompts}")
except Exception:
if get_verbose():
warning("GPT returned an unformatted response. Attempting to clean...")
# Get everything between [ and ], and turn it into a list
r = re.compile(r"\[.*\]")
image_prompts = r.findall(completion)
if len(image_prompts) == 0:
if get_verbose():
warning("Failed to generate Image Prompts. Retrying...")
return self.generate_prompts()
# Limit prompts to max allowed amount
if account_config and account_config.get("use_g4f", False):
image_prompts = image_prompts[:25]
elif len(image_prompts) > n_prompts:
image_prompts = image_prompts[:int(n_prompts)]
self.image_prompts = image_prompts
success(f"Generated {len(image_prompts)} Image Prompts.")
return image_prompts
def generate_image_g4f(self, prompt: str) -> str:
"""
Generates an AI Image using G4F with SDXL Turbo.
Args:
prompt (str): Reference for image generation
Returns:
path (str): The path to the generated image.
"""
print(f"Generating Image using G4F: {prompt}")
try:
from g4f.client import Client
client = Client()
response = client.images.generate(
model="sdxl-turbo",
prompt=prompt,
response_format="url"
)
if response and response.data and len(response.data) > 0:
# Download image from URL
image_url = response.data[0].url
image_response = requests.get(image_url)
if image_response.status_code == 200:
image_path = os.path.join(ROOT_DIR, ".mp", str(uuid4()) + ".png")
with open(image_path, "wb") as image_file:
image_file.write(image_response.content)
if get_verbose():
info(f" => Downloaded Image from {image_url} to \"{image_path}\"\n")
self.images.append(image_path)
return image_path
else:
if get_verbose():
warning(f"Failed to download image from URL: {image_url}")
return None
else:
if get_verbose():
warning("Failed to generate image using G4F - no data in response")
return None
except Exception as e:
if get_verbose():
warning(f"Failed to generate image using G4F: {str(e)}")
return None
def generate_image_cloudflare(self, prompt: str, worker_url: str) -> str:
"""
Generates an AI Image using Cloudflare worker.
Args:
prompt (str): Reference for image generation
worker_url (str): The Cloudflare worker URL
Returns:
path (str): The path to the generated image.
"""
print(f"Generating Image using Cloudflare: {prompt}")
url = f"{worker_url}?prompt={prompt}&model=sdxl"
response = requests.get(url)
if response.headers.get('content-type') == 'image/png':
image_path = os.path.join(ROOT_DIR, ".mp", str(uuid4()) + ".png")
with open(image_path, "wb") as image_file:
image_file.write(response.content)
if get_verbose():
info(f" => Wrote Image to \"{image_path}\"\n")
self.images.append(image_path)
return image_path
else:
if get_verbose():
warning("Failed to generate image. The response was not a PNG image.")
return None
def generate_image(self, prompt: str) -> str:
"""
Generates an AI Image based on the given prompt.
Args:
prompt (str): Reference for image generation
Returns:
path (str): The path to the generated image.
"""
# Get account config from cache
cached_accounts = get_accounts("youtube")
account_config = None
for account in cached_accounts:
if account["id"] == self._account_uuid:
account_config = account
break
if not account_config:
error("Account configuration not found")
return None
# Check if using G4F or Cloudflare
if account_config.get("use_g4f", False):
return self.generate_image_g4f(prompt)
else:
worker_url = account_config.get("worker_url")
if not worker_url:
error("Cloudflare worker URL not configured for this account")
return None
return self.generate_image_cloudflare(prompt, worker_url)
def generate_script_to_speech(self, tts_instance: TTS) -> str:
"""
Converts the generated script into Speech using CoquiTTS and returns the path to the wav file.
Args:
tts_instance (tts): Instance of TTS Class.
Returns:
path_to_wav (str): Path to generated audio (WAV Format).
"""
path = os.path.join(ROOT_DIR, ".mp", str(uuid4()) + ".wav")
# Clean script, remove every character that is not a word character, a space, a period, a question mark, or an exclamation mark.
self.script = re.sub(r'[^\w\s.?!]', '', self.script)
tts_instance.synthesize(self.script, path)
self.tts_path = path
if get_verbose():
info(f" => Wrote TTS to \"{path}\"")
return path
def add_video(self, video: dict) -> None:
"""
Adds a video to the cache.
Args:
video (dict): The video to add
Returns:
None
"""
videos = self.get_videos()
videos.append(video)
cache = get_youtube_cache_path()
with open(cache, "r") as file:
previous_json = json.loads(file.read())
# Find our account
accounts = previous_json["accounts"]
for account in accounts:
if account["id"] == self._account_uuid:
account["videos"].append(video)
# Commit changes
with open(cache, "w") as f:
f.write(json.dumps(previous_json))
def generate_subtitles(self, audio_path: str) -> str:
"""
Generates subtitles for the audio using AssemblyAI.
Args:
audio_path (str): The path to the audio file.
Returns:
path (str): The path to the generated SRT File.
"""
# Turn the video into audio
aai.settings.api_key = get_assemblyai_api_key()
config = aai.TranscriptionConfig()
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(audio_path)
subtitles = transcript.export_subtitles_srt()
srt_path = os.path.join(ROOT_DIR, ".mp", str(uuid4()) + ".srt")
with open(srt_path, "w") as file:
file.write(subtitles)
return srt_path
def combine(self) -> str:
"""
Combines everything into the final video.
Returns:
path (str): The path to the generated MP4 File.
"""
combined_image_path = os.path.join(ROOT_DIR, ".mp", str(uuid4()) + ".mp4")
threads = get_threads()
tts_clip = AudioFileClip(self.tts_path)
max_duration = tts_clip.duration
req_dur = max_duration / len(self.images)
# Make a generator that returns a TextClip when called with consecutive
generator = lambda txt: TextClip(
txt,
font=os.path.join(get_fonts_dir(), get_font()),
fontsize=100,
color="#FFFF00",
stroke_color="black",
stroke_width=5,
size=(1080, 1920),
method="caption",
)
print(colored("[+] Combining images...", "blue"))
clips = []
tot_dur = 0
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while tot_dur < max_duration:
for image_path in self.images:
clip = ImageClip(image_path)
clip.duration = req_dur
clip = clip.set_fps(30)
# Not all images are same size,
# so we need to resize them
if round((clip.w/clip.h), 4) < 0.5625:
if get_verbose():
info(f" => Resizing Image: {image_path} to 1080x1920")
clip = crop(clip, width=clip.w, height=round(clip.w/0.5625), \
x_center=clip.w / 2, \
y_center=clip.h / 2)
else:
if get_verbose():
info(f" => Resizing Image: {image_path} to 1920x1080")
clip = crop(clip, width=round(0.5625*clip.h), height=clip.h, \
x_center=clip.w / 2, \
y_center=clip.h / 2)
clip = clip.resize((1080, 1920))
# FX (Fade In)
#clip = clip.fadein(2)
clips.append(clip)
tot_dur += clip.duration
final_clip = concatenate_videoclips(clips)
final_clip = final_clip.set_fps(30)
random_song = choose_random_song()
subtitles_path = self.generate_subtitles(self.tts_path)
# Equalize srt file
equalize_subtitles(subtitles_path, 10)
# Burn the subtitles into the video
subtitles = SubtitlesClip(subtitles_path, generator)
subtitles.set_pos(("center", "center"))
random_song_clip = AudioFileClip(random_song).set_fps(44100)
# Turn down volume
random_song_clip = random_song_clip.fx(afx.volumex, 0.1)
comp_audio = CompositeAudioClip([
tts_clip.set_fps(44100),
random_song_clip
])
final_clip = final_clip.set_audio(comp_audio)
final_clip = final_clip.set_duration(tts_clip.duration)
# Add subtitles
final_clip = CompositeVideoClip([
final_clip,
subtitles
])
final_clip.write_videofile(combined_image_path, threads=threads)
success(f"Wrote Video to \"{combined_image_path}\"")
return combined_image_path
def generate_video(self, tts_instance: TTS) -> str:
"""
Generates a YouTube Short based on the provided niche and language.
Args:
tts_instance (TTS): Instance of TTS Class.
Returns:
path (str): The path to the generated MP4 File.
"""
# Generate the Topic
self.generate_topic()
# Generate the Script
self.generate_script()
# Generate the Metadata
self.generate_metadata()
# Generate the Image Prompts
self.generate_prompts()
# Generate the Images
for prompt in self.image_prompts:
self.generate_image(prompt)
# Generate the TTS
self.generate_script_to_speech(tts_instance)
# Combine everything
path = self.combine()
if get_verbose():
info(f" => Generated Video: {path}")
self.video_path = os.path.abspath(path)
return path
def get_channel_id(self) -> str:
"""
Gets the Channel ID of the YouTube Account.
Returns:
channel_id (str): The Channel ID.
"""
driver = self.browser
driver.get("https://studio.youtube.com")
time.sleep(2)
channel_id = driver.current_url.split("/")[-1]
self.channel_id = channel_id
return channel_id
def upload_video(self) -> bool:
"""
Uploads the video to YouTube.
Returns:
success (bool): Whether the upload was successful or not.
"""
try:
self.get_channel_id()
driver = self.browser
verbose = get_verbose()
# Go to youtube.com/upload
driver.get("https://www.youtube.com/upload")
# Set video file
FILE_PICKER_TAG = "ytcp-uploads-file-picker"
file_picker = driver.find_element(By.TAG_NAME, FILE_PICKER_TAG)
INPUT_TAG = "input"
file_input = file_picker.find_element(By.TAG_NAME, INPUT_TAG)
file_input.send_keys(self.video_path)
# Wait for upload to finish
time.sleep(5)
# Set title
textboxes = driver.find_elements(By.ID, YOUTUBE_TEXTBOX_ID)
title_el = textboxes[0]
description_el = textboxes[-1]
if verbose:
info("\t=> Setting title...")
title_el.click()
time.sleep(1)
title_el.clear()
title_el.send_keys(self.metadata["title"])
if verbose:
info("\t=> Setting description...")
# Set description
time.sleep(10)
description_el.click()
time.sleep(0.5)
description_el.clear()
description_el.send_keys(self.metadata["description"])
time.sleep(0.5)
# Set `made for kids` option
if verbose:
info("\t=> Setting `made for kids` option...")
is_for_kids_checkbox = driver.find_element(By.NAME, YOUTUBE_MADE_FOR_KIDS_NAME)
is_not_for_kids_checkbox = driver.find_element(By.NAME, YOUTUBE_NOT_MADE_FOR_KIDS_NAME)
if not get_is_for_kids():
is_not_for_kids_checkbox.click()
else:
is_for_kids_checkbox.click()
time.sleep(0.5)
# Click next
if verbose:
info("\t=> Clicking next...")
next_button = driver.find_element(By.ID, YOUTUBE_NEXT_BUTTON_ID)
next_button.click()
# Click next again
if verbose:
info("\t=> Clicking next again...")
next_button = driver.find_element(By.ID, YOUTUBE_NEXT_BUTTON_ID)
next_button.click()
# Wait for 2 seconds
time.sleep(2)
# Click next again
if verbose:
info("\t=> Clicking next again...")
next_button = driver.find_element(By.ID, YOUTUBE_NEXT_BUTTON_ID)
next_button.click()
# Set as unlisted
if verbose:
info("\t=> Setting as unlisted...")
radio_button = driver.find_elements(By.XPATH, YOUTUBE_RADIO_BUTTON_XPATH)
radio_button[2].click()
if verbose:
info("\t=> Clicking done button...")
# Click done button
done_button = driver.find_element(By.ID, YOUTUBE_DONE_BUTTON_ID)
done_button.click()
# Wait for 2 seconds
time.sleep(2)
# Get latest video
if verbose:
info("\t=> Getting video URL...")
# Get the latest uploaded video URL
driver.get(f"https://studio.youtube.com/channel/{self.channel_id}/videos/short")
time.sleep(2)
videos = driver.find_elements(By.TAG_NAME, "ytcp-video-row")
first_video = videos[0]
anchor_tag = first_video.find_element(By.TAG_NAME, "a")
href = anchor_tag.get_attribute("href")
if verbose:
info(f"\t=> Extracting video ID from URL: {href}")
video_id = href.split("/")[-2]
# Build URL
url = build_url(video_id)
self.uploaded_video_url = url
if verbose:
success(f" => Uploaded Video: {url}")
# Add video to cache
self.add_video({
"title": self.metadata["title"],
"description": self.metadata["description"],
"url": url,
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# Close the browser
driver.quit()
return True
except:
self.browser.quit()
return False
def get_videos(self) -> List[dict]:
"""
Gets the uploaded videos from the YouTube Channel.
Returns:
videos (List[dict]): The uploaded videos.
"""
if not os.path.exists(get_youtube_cache_path()):
# Create the cache file
with open(get_youtube_cache_path(), 'w') as file:
json.dump({
"videos": []
}, file, indent=4)
return []
videos = []
# Read the cache file
with open(get_youtube_cache_path(), 'r') as file:
previous_json = json.loads(file.read())
# Find our account
accounts = previous_json["accounts"]
for account in accounts:
if account["id"] == self._account_uuid:
videos = account["videos"]
return videos
import os
import sys
import json
import srt_equalizer
from termcolor import colored
ROOT_DIR = os.path.dirname(sys.path[0])
def assert_folder_structure() -> None:
"""
Make sure that the nessecary folder structure is present.
Returns:
None
"""
# Create the .mp folder
if not os.path.exists(os.path.join(ROOT_DIR, ".mp")):
if get_verbose():
print(colored(f"=> Creating .mp folder at {os.path.join(ROOT_DIR, '.mp')}", "green"))
os.makedirs(os.path.join(ROOT_DIR, ".mp"))
def get_first_time_running() -> bool:
"""
Checks if the program is running for the first time by checking if .mp folder exists.
Returns:
exists (bool): True if the program is running for the first time, False otherwise
"""
return not os.path.exists(os.path.join(ROOT_DIR, ".mp"))
def get_email_credentials() -> dict:
"""
Gets the email credentials from the config file.
Returns:
credentials (dict): The email credentials
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["email"]
def get_verbose() -> bool:
"""
Gets the verbose flag from the config file.
Returns:
verbose (bool): The verbose flag
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["verbose"]
def get_firefox_profile_path() -> str:
"""
Gets the path to the Firefox profile.
Returns:
path (str): The path to the Firefox profile
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["firefox_profile"]
def get_headless() -> bool:
"""
Gets the headless flag from the config file.
Returns:
headless (bool): The headless flag
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["headless"]
def get_model() -> str:
"""
Gets the model from the config file.
Returns:
model (str): The model
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["llm"]
def get_twitter_language() -> str:
"""
Gets the Twitter language from the config file.
Returns:
language (str): The Twitter language
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["twitter_language"]
def get_image_model() -> str:
"""
Gets the Image MOdel from the config file.
Returns:
model (str): The image model
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["image_model"]
def get_threads() -> int:
"""
Gets the amount of threads to use for example when writing to a file with MoviePy.
Returns:
threads (int): Amount of threads
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["threads"]
def get_image_prompt_llm() -> str:
"""
Gets the image prompt for LLM from the config file.
Returns:
prompt (str): The image prompt
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["image_prompt_llm"]
def get_zip_url() -> str:
"""
Gets the URL to the zip file containing the songs.
Returns:
url (str): The URL to the zip file
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["zip_url"]
def get_is_for_kids() -> bool:
"""
Gets the is for kids flag from the config file.
Returns:
is_for_kids (bool): The is for kids flag
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["is_for_kids"]
def get_google_maps_scraper_zip_url() -> str:
"""
Gets the URL to the zip file containing the Google Maps scraper.
Returns:
url (str): The URL to the zip file
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["google_maps_scraper"]
def get_google_maps_scraper_niche() -> str:
"""
Gets the niche for the Google Maps scraper.
Returns:
niche (str): The niche
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["google_maps_scraper_niche"]
def get_scraper_timeout() -> int:
"""
Gets the timeout for the scraper.
Returns:
timeout (int): The timeout
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["scraper_timeout"] or 300
def get_outreach_message_subject() -> str:
"""
Gets the outreach message subject.
Returns:
subject (str): The outreach message subject
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["outreach_message_subject"]
def get_outreach_message_body_file() -> str:
"""
Gets the outreach message body file.
Returns:
file (str): The outreach message body file
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["outreach_message_body_file"]
def get_assemblyai_api_key() -> str:
"""
Gets the AssemblyAI API key.
Returns:
key (str): The AssemblyAI API key
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["assembly_ai_api_key"]
def equalize_subtitles(srt_path: str, max_chars: int = 10) -> None:
"""
Equalizes the subtitles in a SRT file.
Args:
srt_path (str): The path to the SRT file
max_chars (int): The maximum amount of characters in a subtitle
Returns:
None
"""
srt_equalizer.equalize_srt_file(srt_path, srt_path, max_chars)
def get_font() -> str:
"""
Gets the font from the config file.
Returns:
font (str): The font
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["font"]
def get_fonts_dir() -> str:
"""
Gets the fonts directory.
Returns:
dir (str): The fonts directory
"""
return os.path.join(ROOT_DIR, "fonts")
def get_imagemagick_path() -> str:
"""
Gets the path to ImageMagick.
Returns:
path (str): The path to ImageMagick
"""
with open(os.path.join(ROOT_DIR, "config.json"), "r") as file:
return json.load(file)["imagemagick_path"]
"""
This file contains all the constants used in the program.
"""
import g4f
TWITTER_TEXTAREA_CLASS = "public-DraftStyleDefault-block public-DraftStyleDefault-ltr"
TWITTER_POST_BUTTON_XPATH = "/html/body/div[1]/div/div/div[2]/main/div/div/div/div[1]/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/div[3]"
OPTIONS = [
"YouTube Shorts Automation",
"Twitter Bot",
"Affiliate Marketing",
"Outreach",
"Quit"
]
TWITTER_OPTIONS = [
"Post something",
"Show all Posts",
"Setup CRON Job",
"Quit"
]
TWITTER_CRON_OPTIONS = [
"Once a day",
"Twice a day",
"Thrice a day",
"Quit"
]
YOUTUBE_OPTIONS = [
"Upload Short",
"Show all Shorts",
"Setup CRON Job",
"Quit"
]
YOUTUBE_CRON_OPTIONS = [
"Once a day",
"Twice a day",
"Thrice a day",
"Quit"
]
# YouTube Section
YOUTUBE_TEXTBOX_ID = "textbox"
YOUTUBE_MADE_FOR_KIDS_NAME = "VIDEO_MADE_FOR_KIDS_MFK"
YOUTUBE_NOT_MADE_FOR_KIDS_NAME = "VIDEO_MADE_FOR_KIDS_NOT_MFK"
YOUTUBE_NEXT_BUTTON_ID = "next-button"
YOUTUBE_RADIO_BUTTON_XPATH = "//*[@id=\"radioLabel\"]"
YOUTUBE_DONE_BUTTON_ID = "done-button"
# Amazon Section (AFM)$
AMAZON_PRODUCT_TITLE_ID = "productTitle"
AMAZON_FEATURE_BULLETS_ID = "feature-bullets"
def parse_model(model_name: str) -> any:
"""Retrieve a model object based on the provided model name.
Args:
model_name (str): The name of the model to retrieve. Supported names are
"gpt4", "gpt35_turbo", "llama2_7b", "llama2_13b", "llama2_70b", and "mixtral_8x7b".
Returns:
any: The corresponding model object from the `g4f.models` module. If the
model name is not recognized, defaults to returning the "gpt35_turbo" model."""
if model_name == "gpt4":
return g4f.models.gpt_4
elif model_name == "gpt35_turbo":
return g4f.models.gpt_4o_mini
elif model_name == "llama2_7b":
return g4f.models.llama2_7b
elif model_name == "llama2_13b":
return g4f.models.llama2_13b
elif model_name == "llama2_70b":
return g4f.models.llama2_70b
elif model_name == "mixtral_8x7b":
return g4f.models.mixtral_8x7b
else:
# Default model is gpt3.5-turbo
return g4f.models.gpt_4o_mini
# RUN THIS N AMOUNT OF TIMES
import sys
from status import *
from cache import get_accounts
from config import get_verbose
from classes.Tts import TTS
from classes.Twitter import Twitter
from classes.YouTube import YouTube
def main():
"""Main function to post content to Twitter or upload videos to YouTube.
This function determines its operation based on command-line arguments:
- If the purpose is "twitter", it initializes a Twitter account and posts a message.
- If the purpose is "youtube", it initializes a YouTube account, generates a video with TTS, and uploads it.
Command-line arguments:
sys.argv[1]: A string indicating the purpose, either "twitter" or "youtube".
sys.argv[2]: A string representing the account UUID.
The function also handles verbose output based on user settings and reports success or errors as appropriate.
Args:
None. The function uses command-line arguments accessed via sys.argv.
Returns:
None. The function performs operations based on the purpose and account UUID and does not return any value."""
purpose = str(sys.argv[1])
account_id = str(sys.argv[2])
verbose = get_verbose()
if purpose == "twitter":
accounts = get_accounts("twitter")
if not account_id:
error("Account UUID cannot be empty.")
for acc in accounts:
if acc["id"] == account_id:
if verbose:
info("Initializing Twitter...")
twitter = Twitter(
acc["id"],
acc["nickname"],
acc["firefox_profile"],
acc["topic"]
)
twitter.post()
if verbose:
success("Done posting.")
break
elif purpose == "youtube":
tts = TTS()
accounts = get_accounts("youtube")
if not account_id:
error("Account UUID cannot be empty.")
for acc in accounts:
if acc["id"] == account_id:
if verbose:
info("Initializing YouTube...")
youtube = YouTube(
acc["id"],
acc["nickname"],
acc["firefox_profile"],
acc["niche"],
acc["language"]
)
youtube.generate_video(tts)
youtube.upload_video()
if verbose:
success("Uploaded Short.")
break
else:
error("Invalid Purpose, exiting...")
sys.exit(1)
if __name__ == "__main__":
main()
import schedule
import subprocess
from art import *
from cache import *
from utils import *
from config import *
from status import *
from uuid import uuid4
from constants import *
from classes.Tts import TTS
from termcolor import colored
from classes.Twitter import Twitter
from classes.YouTube import YouTube
from prettytable import PrettyTable
from classes.Outreach import Outreach
from classes.AFM import AffiliateMarketing
def main():
"""Main entry point for the application, providing a menu-driven interface
to manage YouTube, Twitter bots, Affiliate Marketing, and Outreach tasks.
This function allows users to:
1. Start the YouTube Shorts Automater to manage YouTube accounts,
generate and upload videos, and set up CRON jobs.
2. Start a Twitter Bot to manage Twitter accounts, post tweets, and
schedule posts using CRON jobs.
3. Manage Affiliate Marketing by creating pitches and sharing them via
Twitter accounts.
4. Initiate an Outreach process for engagement and promotion tasks.
5. Exit the application.
The function continuously prompts users for input, validates it, and
executes the selected option until the user chooses to quit.
Args:
None
Returns:
None"""
# Get user input
# user_input = int(question("Select an option: "))
valid_input = False
while not valid_input:
try:
# Show user options
info("\n============ OPTIONS ============", False)
for idx, option in enumerate(OPTIONS):
print(colored(f" {idx + 1}. {option}", "cyan"))
info("=================================\n", False)
user_input = input("Select an option: ").strip()
if user_input == '':
print("\n" * 100)
raise ValueError("Empty input is not allowed.")
user_input = int(user_input)
valid_input = True
except ValueError as e:
print("\n" * 100)
print(f"Invalid input: {e}")
# Start the selected option
if user_input == 1:
info("Starting YT Shorts Automater...")
cached_accounts = get_accounts("youtube")
if len(cached_accounts) == 0:
warning("No accounts found in cache. Create one now?")
user_input = question("Yes/No: ")
if user_input.lower() == "yes":
generated_uuid = str(uuid4())
success(f" => Generated ID: {generated_uuid}")
nickname = question(" => Enter a nickname for this account: ")
fp_profile = question(" => Enter the path to the Firefox profile: ")
niche = question(" => Enter the account niche: ")
language = question(" => Enter the account language: ")
# Add image generation options
info("\n============ IMAGE GENERATION ============", False)
print(colored(" 1. G4F (SDXL Turbo)", "cyan"))
print(colored(" 2. Cloudflare Worker", "cyan"))
info("=======================================", False)
print(colored("\nRecommendation: If you're unsure, select G4F (Option 1) as there's no additional setup", "yellow"))
info("=======================================\n", False)
image_gen_choice = question(" => Select image generation method (1/2): ")
account_data = {
"id": generated_uuid,
"nickname": nickname,
"firefox_profile": fp_profile,
"niche": niche,
"language": language,
"use_g4f": image_gen_choice == "1",
"videos": []
}
if image_gen_choice == "2":
worker_url = question(" => Enter your Cloudflare worker URL for image generation: ")
account_data["worker_url"] = worker_url
add_account("youtube", account_data)
success("Account configured successfully!")
else:
table = PrettyTable()
table.field_names = ["ID", "UUID", "Nickname", "Niche"]
for account in cached_accounts:
table.add_row([cached_accounts.index(account) + 1, colored(account["id"], "cyan"), colored(account["nickname"], "blue"), colored(account["niche"], "green")])
print(table)
user_input = question("Select an account to start: ")
selected_account = None
for account in cached_accounts:
if str(cached_accounts.index(account) + 1) == user_input:
selected_account = account
if selected_account is None:
error("Invalid account selected. Please try again.", "red")
main()
else:
youtube = YouTube(
selected_account["id"],
selected_account["nickname"],
selected_account["firefox_profile"],
selected_account["niche"],
selected_account["language"]
)
while True:
rem_temp_files()
info("\n============ OPTIONS ============", False)
for idx, youtube_option in enumerate(YOUTUBE_OPTIONS):
print(colored(f" {idx + 1}. {youtube_option}", "cyan"))
info("=================================\n", False)
# Get user input
user_input = int(question("Select an option: "))
tts = TTS()
if user_input == 1:
youtube.generate_video(tts)
upload_to_yt = question("Do you want to upload this video to YouTube? (Yes/No): ")
if upload_to_yt.lower() == "yes":
youtube.upload_video()
elif user_input == 2:
videos = youtube.get_videos()
if len(videos) > 0:
videos_table = PrettyTable()
videos_table.field_names = ["ID", "Date", "Title"]
for video in videos:
videos_table.add_row([
videos.index(video) + 1,
colored(video["date"], "blue"),
colored(video["title"][:60] + "...", "green")
])
print(videos_table)
else:
warning(" No videos found.")
elif user_input == 3:
info("How often do you want to upload?")
info("\n============ OPTIONS ============", False)
for idx, cron_option in enumerate(YOUTUBE_CRON_OPTIONS):
print(colored(f" {idx + 1}. {cron_option}", "cyan"))
info("=================================\n", False)
user_input = int(question("Select an Option: "))
cron_script_path = os.path.join(ROOT_DIR, "src", "cron.py")
command = f"python {cron_script_path} youtube {selected_account['id']}"
def job():
"""Executes a shell command using subprocess.run.
This function runs a specified shell command using the subprocess module.
The command to be executed should be defined in the 'command' variable.
Args:
None
Returns:
None"""
subprocess.run(command)
if user_input == 1:
# Upload Once
schedule.every(1).day.do(job)
success("Set up CRON Job.")
elif user_input == 2:
# Upload Twice a day
schedule.every().day.at("10:00").do(job)
schedule.every().day.at("16:00").do(job)
success("Set up CRON Job.")
else:
break
elif user_input == 4:
if get_verbose():
info(" => Climbing Options Ladder...", False)
break
elif user_input == 2:
info("Starting Twitter Bot...")
cached_accounts = get_accounts("twitter")
if len(cached_accounts) == 0:
warning("No accounts found in cache. Create one now?")
user_input = question("Yes/No: ")
if user_input.lower() == "yes":
generated_uuid = str(uuid4())
success(f" => Generated ID: {generated_uuid}")
nickname = question(" => Enter a nickname for this account: ")
fp_profile = question(" => Enter the path to the Firefox profile: ")
topic = question(" => Enter the account topic: ")
add_account("twitter", {
"id": generated_uuid,
"nickname": nickname,
"firefox_profile": fp_profile,
"topic": topic,
"posts": []
})
else:
table = PrettyTable()
table.field_names = ["ID", "UUID", "Nickname", "Account Topic"]
for account in cached_accounts:
table.add_row([cached_accounts.index(account) + 1, colored(account["id"], "cyan"), colored(account["nickname"], "blue"), colored(account["topic"], "green")])
print(table)
user_input = question("Select an account to start: ")
selected_account = None
for account in cached_accounts:
if str(cached_accounts.index(account) + 1) == user_input:
selected_account = account
if selected_account is None:
error("Invalid account selected. Please try again.", "red")
main()
else:
twitter = Twitter(selected_account["id"], selected_account["nickname"], selected_account["firefox_profile"], selected_account["topic"])
while True:
info("\n============ OPTIONS ============", False)
for idx, twitter_option in enumerate(TWITTER_OPTIONS):
print(colored(f" {idx + 1}. {twitter_option}", "cyan"))
info("=================================\n", False)
# Get user input
user_input = int(question("Select an option: "))
if user_input == 1:
twitter.post()
elif user_input == 2:
posts = twitter.get_posts()
posts_table = PrettyTable()
posts_table.field_names = ["ID", "Date", "Content"]
for post in posts:
posts_table.add_row([
posts.index(post) + 1,
colored(post["date"], "blue"),
colored(post["content"][:60] + "...", "green")
])
print(posts_table)
elif user_input == 3:
info("How often do you want to post?")
info("\n============ OPTIONS ============", False)
for idx, cron_option in enumerate(TWITTER_CRON_OPTIONS):
print(colored(f" {idx + 1}. {cron_option}", "cyan"))
info("=================================\n", False)
user_input = int(question("Select an Option: "))
cron_script_path = os.path.join(ROOT_DIR, "src", "cron.py")
command = f"python {cron_script_path} twitter {selected_account['id']}"
def job():
"""Executes a shell command using subprocess.run.
This function runs a specified shell command using the subprocess module.
The command to be executed should be defined in the 'command' variable.
Args:
None
Returns:
None"""
subprocess.run(command)
if user_input == 1:
# Post Once a day
schedule.every(1).day.do(job)
success("Set up CRON Job.")
elif user_input == 2:
# Post twice a day
schedule.every().day.at("10:00").do(job)
schedule.every().day.at("16:00").do(job)
success("Set up CRON Job.")
elif user_input == 3:
# Post thrice a day
schedule.every().day.at("08:00").do(job)
schedule.every().day.at("12:00").do(job)
schedule.every().day.at("18:00").do(job)
success("Set up CRON Job.")
else:
break
elif user_input == 4:
if get_verbose():
info(" => Climbing Options Ladder...", False)
break
elif user_input == 3:
info("Starting Affiliate Marketing...")
cached_products = get_products()
if len(cached_products) == 0:
warning("No products found in cache. Create one now?")
user_input = question("Yes/No: ")
if user_input.lower() == "yes":
affiliate_link = question(" => Enter the affiliate link: ")
twitter_uuid = question(" => Enter the Twitter Account UUID: ")
# Find the account
account = None
for acc in get_accounts("twitter"):
if acc["id"] == twitter_uuid:
account = acc
add_product({
"id": str(uuid4()),
"affiliate_link": affiliate_link,
"twitter_uuid": twitter_uuid
})
afm = AffiliateMarketing(affiliate_link, account["firefox_profile"], account["id"], account["nickname"], account["topic"])
afm.generate_pitch()
afm.share_pitch("twitter")
else:
table = PrettyTable()
table.field_names = ["ID", "Affiliate Link", "Twitter Account UUID"]
for product in cached_products:
table.add_row([cached_products.index(product) + 1, colored(product["affiliate_link"], "cyan"), colored(product["twitter_uuid"], "blue")])
print(table)
user_input = question("Select a product to start: ")
selected_product = None
for product in cached_products:
if str(cached_products.index(product) + 1) == user_input:
selected_product = product
if selected_product is None:
error("Invalid product selected. Please try again.", "red")
main()
else:
# Find the account
account = None
for acc in get_accounts("twitter"):
if acc["id"] == selected_product["twitter_uuid"]:
account = acc
afm = AffiliateMarketing(selected_product["affiliate_link"], account["firefox_profile"], account["id"], account["nickname"], account["topic"])
afm.generate_pitch()
afm.share_pitch("twitter")
elif user_input == 4:
info("Starting Outreach...")
outreach = Outreach()
outreach.start()
elif user_input == 5:
if get_verbose():
print(colored(" => Quitting...", "blue"))
sys.exit(0)
else:
error("Invalid option selected. Please try again.", "red")
main()
if __name__ == "__main__":
# Print ASCII Banner
print_banner()
first_time = get_first_time_running()
if first_time:
print(colored("Hey! It looks like you're running MoneyPrinter V2 for the first time. Let's get you setup first!", "yellow"))
# Setup file tree
assert_folder_structure()
# Remove temporary files
rem_temp_files()
# Fetch MP3 Files
fetch_songs()
while True:
main()
from termcolor import colored
def error(message: str, show_emoji: bool = True) -> None:
"""
Prints an error message.
Args:
message (str): The error message
show_emoji (bool): Whether to show the emoji
Returns:
None
"""
emoji = "❌" if show_emoji else ""
print(colored(f"{emoji} {message}", "red"))
def success(message: str, show_emoji: bool = True) -> None:
"""
Prints a success message.
Args:
message (str): The success message
show_emoji (bool): Whether to show the emoji
Returns:
None
"""
emoji = "✅" if show_emoji else ""
print(colored(f"{emoji} {message}", "green"))
def info(message: str, show_emoji: bool = True) -> None:
"""
Prints an info message.
Args:
message (str): The info message
show_emoji (bool): Whether to show the emoji
Returns:
None
"""
emoji = "ℹ️" if show_emoji else ""
print(colored(f"{emoji} {message}", "magenta"))
def warning(message: str, show_emoji: bool = True) -> None:
"""
Prints a warning message.
Args:
message (str): The warning message
show_emoji (bool): Whether to show the emoji
Returns:
None
"""
emoji = "⚠️" if show_emoji else ""
print(colored(f"{emoji} {message}", "yellow"))
def question(message: str, show_emoji: bool = True) -> str:
"""
Prints a question message and returns the user's input.
Args:
message (str): The question message
show_emoji (bool): Whether to show the emoji
Returns:
user_input (str): The user's input
"""
emoji = "❓" if show_emoji else ""
return input(colored(f"{emoji} {message}", "magenta"))
import os
import random
import zipfile
import requests
import platform
from status import *
from config import *
def close_running_selenium_instances() -> None:
"""
Closes any running Selenium instances.
Returns:
None
"""
try:
info(" => Closing running Selenium instances...")
# Kill all running Firefox instances
if platform.system() == "Windows":
os.system("taskkill /f /im firefox.exe")
else:
os.system("pkill firefox")
success(" => Closed running Selenium instances.")
except Exception as e:
error(f"Error occurred while closing running Selenium instances: {str(e)}")
def build_url(youtube_video_id: str) -> str:
"""
Builds the URL to the YouTube video.
Args:
youtube_video_id (str): The YouTube video ID.
Returns:
url (str): The URL to the YouTube video.
"""
return f"https://www.youtube.com/watch?v={youtube_video_id}"
def rem_temp_files() -> None:
"""
Removes temporary files in the `.mp` directory.
Returns:
None
"""
# Path to the `.mp` directory
mp_dir = os.path.join(ROOT_DIR, ".mp")
files = os.listdir(mp_dir)
for file in files:
if not file.endswith(".json"):
os.remove(os.path.join(mp_dir, file))
def fetch_songs() -> None:
"""
Downloads songs into songs/ directory to use with geneated videos.
Returns:
None
"""
try:
info(f" => Fetching songs...")
files_dir = os.path.join(ROOT_DIR, "Songs")
if not os.path.exists(files_dir):
os.mkdir(files_dir)
if get_verbose():
info(f" => Created directory: {files_dir}")
else:
# Skip if songs are already downloaded
return
# Download songs
response = requests.get(get_zip_url() or "https://filebin.net/bb9ewdtckolsf3sg/drive-download-20240209T180019Z-001.zip")
# Save the zip file
with open(os.path.join(files_dir, "songs.zip"), "wb") as file:
file.write(response.content)
# Unzip the file
with zipfile.ZipFile(os.path.join(files_dir, "songs.zip"), "r") as file:
file.extractall(files_dir)
# Remove the zip file
os.remove(os.path.join(files_dir, "songs.zip"))
success(" => Downloaded Songs to ../Songs.")
except Exception as e:
error(f"Error occurred while fetching songs: {str(e)}")
def choose_random_song() -> str:
"""
Chooses a random song from the songs/ directory.
Returns:
str: The path to the chosen song.
"""
try:
songs = os.listdir(os.path.join(ROOT_DIR, "Songs"))
song = random.choice(songs)
success(f" => Chose song: {song}")
return os.path.join(ROOT_DIR, "Songs", song)
except Exception as e:
error(f"Error occurred while choosing random song: {str(e)}")