44import irc3
55import datetime
66from irc3 .compat import asyncio
7- from concurrent .futures import ThreadPoolExecutor
87
98__doc__ = '''
109==========================================
2221 irc3.plugins.feeds
2322
2423 [irc3.plugins.feeds]
25- channels = #irc3 # global channel to notify
26- delay = 5 # delay to check feeds
27- directory = ~/.irc3/feeds # directory to store feeds
24+ channels = #irc3 # global channel to notify
25+ delay = 5 # delay to check feeds in minutes
26+ directory = ~/.irc3/feeds # directory to store feeds
2827 hook = irc3.plugins.feeds.default_hook # dotted name to a callable
2928 fmt = [{name}] {entry.title} - {entry.link} # formatter
3029
3433 github/irc3.fmt = [{feed.name}] New commit: {entry.title} - {entry.link}
3534 # custom channels
3635 github/irc3.channels = #irc3dev #irc3
37- # custom delay
36+ # custom delay in minutes
3837 github/irc3.delay = 10
3938
4039Hook is a dotted name refering to a callable (function or class) wich take a
6463'''
6564
6665HEADERS = {
67- 'User-Agent' : 'python-requests /irc3/feeds' ,
66+ 'User-Agent' : 'python-aiohttp /irc3/feeds' ,
6867 'Cache-Control' : 'max-age=0' ,
6968 'Pragma' : 'no-cache' ,
7069}
@@ -82,21 +81,6 @@ def dispatcher(messages):
8281 return dispatcher
8382
8483
85- def fetch (args ):
86- """fetch a feed"""
87- requests = args ['requests' ]
88- for feed , filename in zip (args ['feeds' ], args ['filenames' ]):
89- try :
90- resp = requests .get (feed , timeout = 5 , headers = HEADERS )
91- content = resp .content
92- except Exception : # pragma: no cover
93- pass
94- else :
95- with open (filename , 'wb' ) as fd :
96- fd .write (content )
97- return args ['name' ]
98-
99-
10084ISO_FORMAT = "%Y-%m-%dT%H:%M:%S"
10185
10286
@@ -108,7 +92,7 @@ def parse(feedparser, args):
10892
10993 for filename in args ['filenames' ]:
11094 try :
111- with open (filename + '.updated' ) as fd :
95+ with open (filename + '.updated' , encoding = "UTF-8" ) as fd :
11296 updated = datetime .datetime .strptime (
11397 fd .read ()[:len ("YYYY-MM-DDTHH:MM:SS" )], ISO_FORMAT
11498 )
@@ -146,8 +130,6 @@ def parse(feedparser, args):
146130class Feeds :
147131 """Feeds plugin"""
148132
149- PoolExecutor = ThreadPoolExecutor
150-
151133 def __init__ (self , bot ):
152134 bot .feeds = self
153135 self .bot = bot
@@ -207,7 +189,16 @@ def __init__(self, bot):
207189
208190 def connection_made (self ):
209191 """Initialize checkings"""
210- self .bot .loop .call_later (10 , self .update )
192+ self .bot .create_task (self .periodically_update ())
193+
194+ async def periodically_update (self ):
195+ """After a connection has been made, call update feeds periodically."""
196+ if not self .aiohttp or not self .feedparser :
197+ return
198+ await asyncio .sleep (10 )
199+ while True :
200+ await self .update ()
201+ await asyncio .sleep (self .delay )
211202
212203 def imports (self ):
213204 """show some warnings if needed"""
@@ -218,14 +209,14 @@ def imports(self):
218209 self .bot .log .critical ('feedparser is not installed' )
219210 self .feedparser = None
220211 try :
221- import requests
212+ import aiohttp
222213 except ImportError : # pragma: no cover
223- self .bot .log .critical ('requests is not installed' )
224- self .requests = None
214+ self .bot .log .critical ('aiohttp is not installed' )
215+ self .aiohttp = None
225216 else :
226- self .requests = requests
217+ self .aiohttp = aiohttp
227218
228- def parse (self , * args ):
219+ def parse (self ):
229220 """parse pre-fetched feeds and notify new entries"""
230221 entries = []
231222 for feed in self .feeds .values ():
@@ -237,33 +228,37 @@ def messages():
237228 if entry :
238229 feed = entry .feed
239230 message = feed ['fmt' ].format (feed = feed , entry = entry )
240- for c in feed ['channels' ]:
241- yield c , message
231+ for channel in feed ['channels' ]:
232+ yield channel , message
242233
243234 self .dispatcher (messages ())
244235
245- def update_time (self , future ):
246- name = future .result ()
247- self .bot .log .debug ('Feed %s fetched' , name )
248- feed = self .feeds [name ]
249- feed ['time' ] = time .time ()
250-
251- def update (self ):
236+ async def update (self ):
252237 """update feeds"""
253- loop = self .bot .loop
254- loop .call_later (self .delay , self .update )
255-
256238 now = time .time ()
257- feeds = [dict (f , requests = self .requests ) for f in self .feeds .values ()
258- if f ['time' ] < now - f ['delay' ]]
259- if feeds :
260- self .bot .log .info ('Fetching feeds %s' ,
261- ', ' .join ([f ['name' ] for f in feeds ]))
262- tasks = []
263- for feed in feeds :
264- task = loop .run_in_executor (None , fetch , feed )
265- task .add_done_callback (self .update_time )
266- tasks .append (task )
267- task = self .bot .create_task (
268- asyncio .wait (tasks , timeout = len (feeds ) * 2 , loop = loop ))
269- task .add_done_callback (self .parse )
239+ feeds = [feed for feed in self .feeds .values ()
240+ if feed ['time' ] < now - feed ['delay' ]]
241+ if not feeds :
242+ return
243+ self .bot .log .info ('Fetching feeds %s' ,
244+ ', ' .join ([f ['name' ] for f in feeds ]))
245+ timeout = self .aiohttp .ClientTimeout (total = 5 )
246+ async with self .aiohttp .ClientSession (timeout = timeout ) as session :
247+ await asyncio .gather (
248+ * [self .fetch (feed , session ) for feed in feeds ]
249+ )
250+ self .parse ()
251+
252+ async def fetch (self , feed , session ):
253+ """fetch a feed"""
254+ for url , filename in zip (feed ['feeds' ], feed ['filenames' ]):
255+ try :
256+ async with session .get (url , headers = HEADERS ) as resp :
257+ with open (filename , 'wb' ) as file :
258+ file .write (await resp .read ())
259+ except Exception : # pragma: no cover
260+ self .bot .log .exception (
261+ "Exception while fetching feed %s" , feed ['name' ]
262+ )
263+ self .bot .log .debug ('Feed %s fetched' , feed ['name' ])
264+ feed ['time' ] = time .time ()
0 commit comments