Skip to content

Commit ade05a5

Browse files
authored
Added RSS watcher (#8429)
2 parents 39cac16 + d01d5a0 commit ade05a5

18 files changed

+554
-8
lines changed

src/tribler/core/components.py

+28
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,34 @@ def get_endpoints(self) -> list[RESTEndpoint]:
289289
return [*super().get_endpoints(), RecommenderEndpoint()]
290290

291291

292+
@precondition('session.config.get("rss/enabled")')
293+
class RSSComponent(ComponentLauncher):
294+
"""
295+
Launch instructions for the RSS component.
296+
"""
297+
298+
def finalize(self, ipv8: IPv8, session: Session, community: Community) -> None:
299+
"""
300+
When we are done launching, register our REST API.
301+
"""
302+
from tribler.core.rss.rss import RSSWatcherManager
303+
304+
manager = RSSWatcherManager(community, session.notifier, session.config.get("rss/urls"))
305+
manager.start()
306+
307+
endpoint = session.rest_manager.get_endpoint("/api/rss")
308+
endpoint.manager = manager
309+
endpoint.config = session.config
310+
311+
def get_endpoints(self) -> list[RESTEndpoint]:
312+
"""
313+
Add the RSS endpoint.
314+
"""
315+
from tribler.core.rss.restapi.endpoint import RSSEndpoint
316+
317+
return [*super().get_endpoints(), RSSEndpoint()]
318+
319+
292320
@set_in_session("tunnel_community")
293321
@precondition('session.config.get("tunnel_community/enabled")')
294322
@after("DHTDiscoveryComponent")

src/tribler/core/rss/__init__.py

Whitespace-only changes.

src/tribler/core/rss/restapi/__init__.py

Whitespace-only changes.
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
from aiohttp import web
6+
from aiohttp_apispec import docs, json_schema
7+
from ipv8.REST.schema import schema
8+
from marshmallow.fields import Boolean, String
9+
10+
from tribler.core.restapi.rest_endpoint import RESTEndpoint, RESTResponse
11+
12+
if TYPE_CHECKING:
13+
from typing_extensions import TypeAlias
14+
15+
from tribler.core.restapi.rest_manager import TriblerRequest
16+
from tribler.core.rss.rss import RSSWatcherManager
17+
from tribler.tribler_config import TriblerConfigManager
18+
19+
RequestType: TypeAlias = TriblerRequest[tuple[RSSWatcherManager, TriblerConfigManager]]
20+
21+
22+
class RSSEndpoint(RESTEndpoint):
23+
"""
24+
This endpoint allow.
25+
"""
26+
27+
path = "/api/rss"
28+
29+
def __init__(self) -> None:
30+
"""
31+
Create a new endpoint to update the registered RSS feeds.
32+
"""
33+
super().__init__()
34+
35+
self.manager: RSSWatcherManager | None = None
36+
self.config: TriblerConfigManager | None = None
37+
self.required_components = ("manager", "config")
38+
39+
self.app.add_routes([web.put("", self.update_feeds)])
40+
41+
@docs(
42+
tags=["RSS"],
43+
summary="Set the current RSS feeds.",
44+
parameters=[],
45+
responses={
46+
200: {
47+
"schema": schema(
48+
RSSResponse={
49+
"modified": Boolean,
50+
}
51+
),
52+
"examples": {"modified": True},
53+
}
54+
},
55+
)
56+
@json_schema(schema(RSSFeeds={
57+
"urls": ([String], "the RSS URLs to listen to")
58+
}))
59+
async def update_feeds(self, request: RequestType) -> RESTResponse:
60+
"""
61+
Set the current RSS feeds.
62+
"""
63+
urls = (await request.json())["urls"]
64+
65+
request.context[0].update(urls) # context[0] = self.manager
66+
request.context[1].set("rss/urls", urls) # context[1] = self.config
67+
68+
return RESTResponse({"modified": True})

src/tribler/core/rss/rss.py

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
from __future__ import annotations
2+
3+
import contextlib
4+
import logging
5+
from asyncio.exceptions import TimeoutError as AsyncTimeoutError
6+
from email.utils import formatdate, parsedate
7+
from io import BytesIO
8+
from ssl import SSLError
9+
from time import mktime, time
10+
from typing import TYPE_CHECKING
11+
from xml.etree import ElementTree as ET
12+
from xml.etree.ElementTree import ParseError
13+
14+
import libtorrent
15+
from aiohttp import ClientConnectorError, ClientResponseError, ClientSession, ServerConnectionError
16+
from aiohttp.web_exceptions import HTTPNotModified, HTTPOk
17+
18+
from tribler.core.database.orm_bindings.torrent_metadata import tdef_to_metadata_dict
19+
from tribler.core.libtorrent.restapi.torrentinfo_endpoint import query_uri
20+
from tribler.core.libtorrent.torrentdef import TorrentDef
21+
from tribler.core.libtorrent.uris import unshorten
22+
from tribler.core.notifier import Notification, Notifier
23+
24+
if TYPE_CHECKING:
25+
from http.cookies import SimpleCookie
26+
27+
from aiohttp import ClientResponse
28+
from ipv8.taskmanager import TaskManager
29+
30+
logger = logging.getLogger(__name__)
31+
32+
33+
class RSSWatcher:
34+
"""
35+
Watch a single RSS URL and call updates when new torrents are added.
36+
"""
37+
38+
def __init__(self, task_manager: TaskManager, notifier: Notifier, url: str) -> None:
39+
"""
40+
Initialize (but don't start) with a given taskmanager, callback and url.
41+
"""
42+
super().__init__()
43+
44+
self.url = url
45+
self.previous_entries: set[str] = set()
46+
47+
self.cookies: SimpleCookie | None = None
48+
self.last_modified: str | None = None
49+
self.next_check: float = 0.0
50+
51+
self.task_manager = task_manager
52+
self.notifier = notifier
53+
54+
self.running: bool = False
55+
56+
def start(self) -> None:
57+
"""
58+
Start periodically querying our URL.
59+
"""
60+
task = self.task_manager.register_task(f"RSS watcher for {self.url}", self.check, interval=60.0)
61+
self.running = hasattr(task, "get_name")
62+
63+
def stop(self) -> None:
64+
"""
65+
Stop periodically querying our URL.
66+
"""
67+
if self.running:
68+
self.task_manager.cancel_pending_task(f"RSS watcher for {self.url}")
69+
70+
async def resolve(self, urls: set[str]) -> None:
71+
"""
72+
Download the torrent files and add them to our database.
73+
"""
74+
for url in urls:
75+
try:
76+
uri = await unshorten(url)
77+
response = await query_uri(uri, valid_cert=False)
78+
except (ServerConnectionError, ClientResponseError, SSLError, ClientConnectorError,
79+
AsyncTimeoutError, ValueError) as e:
80+
logger.warning("Error while querying http uri: %s", str(e))
81+
continue
82+
83+
try:
84+
metainfo = libtorrent.bdecode(response)
85+
except RuntimeError as e:
86+
logger.warning("Error while reading http uri response: %s", str(e))
87+
continue
88+
89+
torrent_def = TorrentDef.load_from_dict(metainfo)
90+
metadata_dict = tdef_to_metadata_dict(torrent_def)
91+
self.notifier.notify(Notification.torrent_metadata_added, metadata=metadata_dict)
92+
93+
async def conditional_get(self, last_modified_time: float) -> tuple[ClientResponse, bytes]:
94+
"""
95+
Send a conditional get to our URL and return the response and its raw content.
96+
"""
97+
headers = {"If-Modified-Since": formatdate(timeval=last_modified_time, localtime=False, usegmt=True)}
98+
async with ClientSession(None, headers=headers) as session, \
99+
session.get(self.url, cookies=self.cookies) as response:
100+
return response, await response.read()
101+
102+
async def check(self) -> None:
103+
"""
104+
Check our URL as lazily as possible.
105+
"""
106+
if time() < self.next_check:
107+
logger.info("Skipping check, server requested backoff")
108+
return
109+
110+
# Try to be kind to the server and perform a conditional HTTP GET.
111+
# If supported, the server will answer with a HTTP 304 error code when we don't need to do anything.
112+
if self.last_modified and (parsed_date := parsedate(self.last_modified)):
113+
last_modified_time = mktime(parsed_date)
114+
else:
115+
last_modified_time = 0
116+
117+
try:
118+
response, content = await self.conditional_get(last_modified_time)
119+
except Exception as e:
120+
logger.exception("Retrieving %s failed with: %s", self.url, e.__class__.__name__)
121+
self.next_check = time() + 120 # Default timeout
122+
return
123+
124+
# Determine the back-off requested by the server.
125+
refresh_timeout_min = 120
126+
for h_keep_alive in response.headers.get("Keep-Alive", "").split(","):
127+
if h_keep_alive.startswith("timeout"):
128+
values = h_keep_alive.split("=")[1:]
129+
if len(values) == 1:
130+
with contextlib.suppress(ValueError):
131+
refresh_timeout_min = int(values[0])
132+
logger.info("%s requested timeout of %d seconds", self.url, refresh_timeout_min)
133+
self.next_check = time() + refresh_timeout_min
134+
self.last_modified = response.headers.get("Date")
135+
136+
if response.status == HTTPOk.status_code:
137+
await self.parse_rss(content)
138+
elif response.status == HTTPNotModified.status_code:
139+
logger.info("%s conditional GET flagged no new content", self.url)
140+
141+
async def parse_rss(self, content: bytes) -> None:
142+
"""
143+
Check if the RSS content includes any new ``.torrent`` values.
144+
"""
145+
out = set()
146+
with contextlib.suppress(ParseError):
147+
tree = ET.parse(BytesIO(content)) # noqa: S314
148+
for child in tree.iter():
149+
value = child.text
150+
if value and value.endswith(".torrent"):
151+
out.add(value)
152+
new_entries = out - self.previous_entries
153+
self.previous_entries = out
154+
if new_entries:
155+
await self.resolve(new_entries)
156+
157+
158+
class RSSWatcherManager:
159+
"""
160+
Manage multiple RSS URL watchers.
161+
162+
Allowed in the URL list:
163+
- Empty RSS feeds, for user spacing/organization. Resolved here.
164+
- Duplicate RSS feeds. Resolved here.
165+
- Unreachable RSS feeds. Resolved in ``RSSWatcher``.
166+
"""
167+
168+
def __init__(self, task_manager: TaskManager, notifier: Notifier, urls: list[str]) -> None:
169+
"""
170+
Initialize (but don't start) with a given taskmanager, callback and urls.
171+
"""
172+
super().__init__()
173+
174+
self.task_manager = task_manager
175+
self.notifier = notifier
176+
self.watchers = {url: RSSWatcher(task_manager, notifier, url) for url in set(urls) if url}
177+
178+
def start(self) -> None:
179+
"""
180+
Start all our watchers.
181+
"""
182+
for watcher in self.watchers.values():
183+
watcher.start()
184+
185+
def stop(self) -> None:
186+
"""
187+
Stop all our watchers.
188+
"""
189+
for watcher in self.watchers.values():
190+
watcher.stop()
191+
self.watchers.clear()
192+
193+
def update(self, urls: list[str]) -> None:
194+
"""
195+
Update the RSS URLs that we are watching. Start and stop watchers accordingly.
196+
"""
197+
started = [url for url in set(urls) if url and url not in self.watchers]
198+
stopped = [url for url in self.watchers if url not in urls]
199+
for url in stopped:
200+
watcher = self.watchers.pop(url)
201+
watcher.stop()
202+
for url in started:
203+
watcher = RSSWatcher(self.task_manager, self.notifier, url)
204+
self.watchers[url] = watcher
205+
watcher.start()

src/tribler/core/session.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DHTDiscoveryComponent,
2121
RecommenderComponent,
2222
RendezvousComponent,
23+
RSSComponent,
2324
TorrentCheckerComponent,
2425
TunnelComponent,
2526
VersioningComponent,
@@ -154,8 +155,8 @@ def register_launchers(self) -> None:
154155
Register all IPv8 launchers that allow communities to be loaded.
155156
"""
156157
for launcher_class in [ContentDiscoveryComponent, DatabaseComponent, DHTDiscoveryComponent,
157-
RecommenderComponent, RendezvousComponent, TorrentCheckerComponent, TunnelComponent,
158-
VersioningComponent, WatchFolderComponent]:
158+
RecommenderComponent, RendezvousComponent, RSSComponent, TorrentCheckerComponent,
159+
TunnelComponent, VersioningComponent, WatchFolderComponent]:
159160
instance = launcher_class()
160161
for rest_ep in instance.get_endpoints():
161162
self.rest_manager.add_endpoint(rest_ep)

src/tribler/test_unit/core/rss/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)