Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions ig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Ultroid - UserBot
# Instagram Downloader Plugin
#
# This file is part of < https://github.com/TeamUltroid/Ultroid/ >
# Please read the GNU Affero General Public License in
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>.

"""
✘ Commands Available -

• `{i}ig <instagram link>`
Download reels/videos/photos/carousels from Instagram.

"""

import glob
import os
import time
import asyncio
from yt_dlp import YoutubeDL

from pyUltroid import LOGS
from pyUltroid.fns.helper import humanbytes, run_async, time_formatter
from pyUltroid.fns.tools import set_attributes
from . import ultroid_cmd
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module is added at the repo root, but Ultroid’s Loader only auto-imports plugin modules from the plugins/ directory; additionally, from . import ultroid_cmd will fail unless the file is inside the plugins package. Please move/rename this to plugins/instagram.py (or similar) and keep imports consistent with other plugins so it’s actually loaded at startup.

Suggested change
from . import ultroid_cmd
from plugins import ultroid_cmd

Copilot uses AI. Check for mistakes.


# ----------- Progress Handler -----------

async def ig_progress(data, start_time, event):
if data["status"] == "error":
return await event.edit("**[X] Error while downloading.**")

if data["status"] == "downloading":
txt = (
"**Downloading from Instagram...**\n\n"
f"**File:** `{data.get('filename','')}`\n"
f"**Total:** `{humanbytes(data.get('total_bytes', 0))}`\n"
f"**Done:** `{humanbytes(data.get('downloaded_bytes', 0))}`\n"
f"**Speed:** `{humanbytes(data.get('speed', 0))}/s`\n"
f"**ETA:** `{time_formatter(data.get('eta', 0) * 1000)}`"
)

# update every 10 seconds
if round((time.time() - start_time) % 10) == 0:
try:
await event.edit(txt)
except:
pass


# ----------- Instagram Extraction -----------

@run_async
def _download_ig(url, opts):
try:
return YoutubeDL(opts).extract_info(url=url, download=True)
except Exception as e:
LOGS.error(f"IG Download Error: {e}")
return None


# ----------- IG Handler -----------

async def insta_downloader(event, url):
msg = await event.eor("`Fetching Instagram media...`")
reply_to = event.reply_to_msg_id or event

opts = {
"quiet": True,
"prefer_ffmpeg": True,
"geo-bypass": True,
"nocheckcertificate": True,
"outtmpl": "%(id)s.%(ext)s",
"progress_hooks": [lambda d: asyncio.create_task(ig_progress(d, time.time(), msg))],
Comment on lines +69 to +75
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress_hooks is invoked from the yt-dlp download thread (because _download_ig uses @run_async), so calling asyncio.create_task(...) here will raise RuntimeError: no running event loop and/or spawn an unbounded number of tasks. Capture the main loop once (e.g., loop = asyncio.get_running_loop() and a single start_time) and use a thread-safe scheduling approach (e.g., asyncio.run_coroutine_threadsafe) with proper rate-limiting.

Suggested change
opts = {
"quiet": True,
"prefer_ffmpeg": True,
"geo-bypass": True,
"nocheckcertificate": True,
"outtmpl": "%(id)s.%(ext)s",
"progress_hooks": [lambda d: asyncio.create_task(ig_progress(d, time.time(), msg))],
# Capture the main event loop and a single start time for this download session
loop = asyncio.get_running_loop()
start_time = time.time()
# Simple rate-limiting state to avoid flooding the loop with tasks
_rate_limit_state = {"last_update": 0.0}
def progress_hook(d):
"""
Thread-safe progress hook called by yt-dlp from its download thread.
Schedules ig_progress on the main asyncio loop with basic rate limiting.
"""
now = time.time()
# Allow an update at most once per second
if now - _rate_limit_state["last_update"] < 1.0:
return
_rate_limit_state["last_update"] = now
try:
asyncio.run_coroutine_threadsafe(
ig_progress(d, start_time, msg),
loop,
)
except RuntimeError:
# Event loop may be closed; ignore in that case
pass
opts = {
"quiet": True,
"prefer_ffmpeg": True,
"geo-bypass": True,
"nocheckcertificate": True,
"outtmpl": "%(id)s.%(ext)s",
"progress_hooks": [progress_hook],

Copilot uses AI. Check for mistakes.
}

info = await _download_ig(url, opts)
if not info:
return await msg.edit("**[X] Failed to fetch Instagram media.**")

# Playlist = carousel
if info.get("_type") == "playlist":
entries = info.get("entries", [])
else:
entries = [info]

await msg.edit(f"**Downloading {len(entries)} media...**")

for idx, media in enumerate(entries, start=1):
media_id = media.get("id")
title = media.get("title") or "Instagram_Media"

if len(title) > 30:
title = title[:27] + "..."

# Find downloaded file from yt-dlp
media_path = None
for f in glob.glob(f"{media_id}*"):
if not f.endswith(".jpg"):
media_path = f
break

if not media_path:
Comment on lines +98 to +104
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file-selection logic skips any downloaded .jpg file, which means Instagram photo posts (and carousels containing photos) will be silently ignored because the only downloaded media may be a .jpg. Instead of excluding .jpg, use yt-dlp’s returned filename fields (e.g., _filename/requested_downloads) or select by the entry’s expected extension so photos are handled correctly.

Suggested change
media_path = None
for f in glob.glob(f"{media_id}*"):
if not f.endswith(".jpg"):
media_path = f
break
if not media_path:
media_path = media.get("_filename")
# Fallback: construct from id/ext using outtmpl "%(id)s.%(ext)s"
if not media_path:
ext = media.get("ext")
if ext:
candidate = f"{media_id}.{ext}"
if os.path.exists(candidate):
media_path = candidate
# Last resort: any file starting with the media_id (no .jpg exclusion)
if not media_path:
matches = glob.glob(f"{media_id}.*")
if matches:
media_path = matches[0]
if not media_path or not os.path.exists(media_path):

Copilot uses AI. Check for mistakes.
continue

# Rename file
ext = "." + media_path.split(".")[-1]
final_name = f"{title}{ext}"

try:
Comment on lines +92 to +111
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

title comes from remote metadata and is used directly to build final_name. If the title contains path separators (e.g., ../, /, \) or other special characters, os.rename may write outside the working dir or fail unpredictably. Please sanitize to a safe basename (strip separators / reserved chars) before constructing the output filename.

Copilot uses AI. Check for mistakes.
os.rename(media_path, final_name)
except:
final_name = media_path

attributes = await set_attributes(final_name)

uploaded, _ = await event.client.fast_uploader(
final_name, show_progress=True, event=msg, to_delete=True
)

await event.client.send_file(
event.chat_id,
file=uploaded,
caption=f"**Instagram Media**\n`[{idx}/{len(entries)}]`",
attributes=attributes,
supports_streaming=True,
reply_to=reply_to,
)

await msg.edit("**Instagram Download Complete.**")


# ----------- Command Handler -----------

@ultroid_cmd(
pattern="ig ?(.*)",
category="Media"
)
async def _insta_cmd(event):
url = event.pattern_match.group(1).strip()

if not url:
return await event.eor("**Usage:** `.ig <instagram link>`")

if "instagram.com" not in url:
return await event.eor("**[X] Invalid Instagram link.**")

await insta_downloader(event, url)
Loading