44import irc3
55import datetime
66from irc3 .compat import asyncio
7- from concurrent .futures import ThreadPoolExecutor
87
98__doc__ = '''
109==========================================
2221 irc3.plugins.feeds
2322
2423 [irc3.plugins.feeds]
25- channels = #irc3 # global channel to notify
26- delay = 5 # delay to check feeds
27- directory = ~/.irc3/feeds # directory to store feeds
24+ channels = #irc3 # global channel to notify
25+ delay = 5 # delay to check feeds in minutes
26+ directory = ~/.irc3/feeds # directory to store feeds
2827 hook = irc3.plugins.feeds.default_hook # dotted name to a callable
2928 fmt = [{name}] {entry.title} - {entry.link} # formatter
3029
3433 github/irc3.fmt = [{feed.name}] New commit: {entry.title} - {entry.link}
3534 # custom channels
3635 github/irc3.channels = #irc3dev #irc3
37- # custom delay
36+ # custom delay in minutes
3837 github/irc3.delay = 10
3938
4039Hook is a dotted name refering to a callable (function or class) wich take a
6362
6463'''
6564
65+ HEADERS = {
66+ 'User-Agent' : 'python-aiohttp/irc3/feeds' ,
67+ 'Cache-Control' : 'max-age=0' ,
68+ 'Pragma' : 'no-cache' ,
69+ }
70+
6671
6772def default_hook (entries ):
6873 """Default hook called for each entry"""
@@ -76,21 +81,6 @@ def dispatcher(messages):
7681 return dispatcher
7782
7883
79- def fetch (args ):
80- """fetch a feed"""
81- session = args ['session' ]
82- for feed , filename in zip (args ['feeds' ], args ['filenames' ]):
83- try :
84- resp = session .get (feed , timeout = 5 )
85- content = resp .content
86- except Exception : # pragma: no cover
87- pass
88- else :
89- with open (filename , 'wb' ) as fd :
90- fd .write (content )
91- return args ['name' ]
92-
93-
9484ISO_FORMAT = "%Y-%m-%dT%H:%M:%S"
9585
9686
@@ -102,7 +92,7 @@ def parse(feedparser, args):
10292
10393 for filename in args ['filenames' ]:
10494 try :
105- with open (filename + '.updated' ) as fd :
95+ with open (filename + '.updated' , encoding = "UTF-8" ) as fd :
10696 updated = datetime .datetime .strptime (
10797 fd .read ()[:len ("YYYY-MM-DDTHH:MM:SS" )], ISO_FORMAT
10898 )
@@ -140,14 +130,6 @@ def parse(feedparser, args):
140130class Feeds :
141131 """Feeds plugin"""
142132
143- PoolExecutor = ThreadPoolExecutor
144-
145- headers = {
146- 'User-Agent' : 'python-requests/irc3/feeds' ,
147- 'Cache-Control' : 'max-age=0' ,
148- 'Pragma' : 'no-cache' ,
149- }
150-
151133 def __init__ (self , bot ):
152134 bot .feeds = self
153135 self .bot = bot
@@ -177,7 +159,6 @@ def __init__(self, bot):
177159 fmt = config .get ('fmt' , '[{feed.name}] {entry.title} {entry.link}' ),
178160 delay = delay ,
179161 channels = config .get ('channels' , '' ),
180- headers = self .headers ,
181162 time = 0 ,
182163 )
183164
@@ -208,7 +189,16 @@ def __init__(self, bot):
208189
209190 def connection_made (self ):
210191 """Initialize checkings"""
211- self .bot .loop .call_later (10 , self .update )
192+ self .bot .create_task (self .periodically_update ())
193+
194+ async def periodically_update (self ):
195+ """After a connection has been made, call update feeds periodically."""
196+ if not self .aiohttp or not self .feedparser :
197+ return
198+ await asyncio .sleep (10 )
199+ while True :
200+ await self .update ()
201+ await asyncio .sleep (self .delay )
212202
213203 def imports (self ):
214204 """show some warnings if needed"""
@@ -219,15 +209,14 @@ def imports(self):
219209 self .bot .log .critical ('feedparser is not installed' )
220210 self .feedparser = None
221211 try :
222- import requests
212+ import aiohttp
223213 except ImportError : # pragma: no cover
224- self .bot .log .critical ('requests is not installed' )
225- self .session = None
214+ self .bot .log .critical ('aiohttp is not installed' )
215+ self .aiohttp = None
226216 else :
227- self .session = requests .Session ()
228- self .session .headers .update (self .headers )
217+ self .aiohttp = aiohttp
229218
230- def parse (self , * args ):
219+ def parse (self ):
231220 """parse pre-fetched feeds and notify new entries"""
232221 entries = []
233222 for feed in self .feeds .values ():
@@ -239,34 +228,37 @@ def messages():
239228 if entry :
240229 feed = entry .feed
241230 message = feed ['fmt' ].format (feed = feed , entry = entry )
242- for c in feed ['channels' ]:
243- yield c , message
231+ for channel in feed ['channels' ]:
232+ yield channel , message
244233
245234 self .dispatcher (messages ())
246235
247- def update_time (self , future ):
248- name = future .result ()
249- self .bot .log .debug ('Feed %s fetched' , name )
250- feed = self .feeds [name ]
251- feed ['time' ] = time .time ()
252-
253- def update (self ):
236+ async def update (self ):
254237 """update feeds"""
255- loop = self .bot .loop
256- loop .call_later (self .delay , self .update )
257-
258238 now = time .time ()
259- session = self .session
260- feeds = [dict (f , session = session ) for f in self .feeds .values ()
261- if f ['time' ] < now - f ['delay' ]]
262- if feeds :
263- self .bot .log .info ('Fetching feeds %s' ,
264- ', ' .join ([f ['name' ] for f in feeds ]))
265- tasks = []
266- for feed in feeds :
267- task = loop .run_in_executor (None , fetch , feed )
268- task .add_done_callback (self .update_time )
269- tasks .append (task )
270- task = self .bot .create_task (
271- asyncio .wait (tasks , timeout = len (feeds ) * 2 , loop = loop ))
272- task .add_done_callback (self .parse )
239+ feeds = [feed for feed in self .feeds .values ()
240+ if feed ['time' ] < now - feed ['delay' ]]
241+ if not feeds :
242+ return
243+ self .bot .log .info ('Fetching feeds %s' ,
244+ ', ' .join ([f ['name' ] for f in feeds ]))
245+ timeout = self .aiohttp .ClientTimeout (total = 5 )
246+ async with self .aiohttp .ClientSession (timeout = timeout ) as session :
247+ await asyncio .gather (
248+ * [self .fetch (feed , session ) for feed in feeds ]
249+ )
250+ self .parse ()
251+
252+ async def fetch (self , feed , session ):
253+ """fetch a feed"""
254+ for url , filename in zip (feed ['feeds' ], feed ['filenames' ]):
255+ try :
256+ async with session .get (url , headers = HEADERS ) as resp :
257+ with open (filename , 'wb' ) as file :
258+ file .write (await resp .read ())
259+ except Exception : # pragma: no cover
260+ self .bot .log .exception (
261+ "Exception while fetching feed %s" , feed ['name' ]
262+ )
263+ self .bot .log .debug ('Feed %s fetched' , feed ['name' ])
264+ feed ['time' ] = time .time ()
0 commit comments