Skip to content

Commit 4e807f8

Browse files
committed
Merge pull request #799 from LipuFei/fix_metadata_next
Fix metadata injector
2 parents 73d4e1a + af92b30 commit 4e807f8

File tree

1 file changed

+188
-159
lines changed

1 file changed

+188
-159
lines changed

Tribler/Main/metadata-injector.py

+188-159
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
# modify the sys.stderr and sys.stdout for safe output
1010

11-
from traceback import print_exc
1211
import optparse
1312
import os
1413
import random
@@ -18,124 +17,195 @@
1817
from hashlib import sha1
1918
import logging
2019
import logging.config
21-
import binascii
2220

23-
from Tribler.Core.Utilities.twisted_thread import reactor
21+
from Tribler.Core.Utilities.twisted_thread import reactor, stop_reactor, reactor_thread
22+
from Tribler.dispersy.util import call_on_reactor_thread
2423

24+
from Tribler.Core.simpledefs import NTFY_DISPERSY, NTFY_STARTED
2525
from Tribler.Core.TorrentDef import TorrentDef
2626
from Tribler.Core.Session import Session
2727
from Tribler.Core.SessionConfig import SessionStartupConfig
2828
from Tribler.Main.Utility.Feeds.rssparser import RssParser
2929
from Tribler.Main.Utility.Feeds.dirfeed import DirectoryFeedThread
30-
from Tribler.Main.vwxGUI.SearchGridManager import TorrentManager, LibraryManager, \
31-
ChannelManager
30+
from Tribler.Main.vwxGUI.SearchGridManager import TorrentManager, LibraryManager, ChannelManager
3231
from Tribler.Main.vwxGUI.TorrentStateManager import TorrentStateManager
3332

34-
logger = logging.getLogger(__name__)
35-
36-
37-
def define_communities(session):
38-
from Tribler.community.allchannel.community import AllChannelCommunity
39-
from Tribler.community.channel.community import ChannelCommunity
40-
from Tribler.community.metadata.community import MetadataCommunity
41-
42-
dispersy = session.get_dispersy_instance()
43-
dispersy.define_auto_load(AllChannelCommunity,
44-
session.dispersy_member,
45-
load=True)
46-
dispersy.define_auto_load(ChannelCommunity,
47-
session.dispersy_member,
48-
load=True)
49-
dispersy.define_auto_load(MetadataCommunity,
50-
session.dispersy_member,
51-
load=True)
52-
logger.info(u"Dispersy communities are ready")
53-
54-
55-
def dispersy_started(session, opt, torrent_manager, channel_manager):
56-
channelname = opt.channelname if hasattr(opt, 'chanelname') else ''
57-
nickname = opt.nickname if hasattr(opt, 'nickname') else ''
58-
my_channel_name = channelname or nickname or 'MetadataInjector-Channel'
59-
my_channel_name = unicode(my_channel_name)
60-
61-
new_channel_created = False
62-
my_channel_id = channel_manager.channelcast_db.getMyChannelId()
63-
if not my_channel_id:
64-
logger.info(u"Create a new channel")
65-
channel_manager.createChannel(my_channel_name, u'')
66-
new_channel_created = True
67-
else:
68-
logger.info(u"Use existing channel %s", binascii.hexlify(my_channel_id))
69-
my_channel = channel_manager.getChannel(my_channel_id)
70-
if my_channel.name != my_channel_name:
71-
logger.info(u"Rename channel to %s", my_channel_name)
72-
channel_manager.modifyChannel(my_channel_id, {'name': my_channel_name})
73-
my_channel_id = channel_manager.channelcast_db.getMyChannelId()
74-
logger.info(u"Channel ID [%s]", my_channel_id)
75-
76-
def create_torrent_feed():
77-
logger.info(u"Creating RSS Feed...")
7833

34+
class MetadataInjector(object):
35+
36+
def __init__(self, opt):
37+
self._logger = logging.getLogger(self.__class__.__name__)
38+
39+
self._opt = opt
40+
self._session = None
41+
42+
self._torrent_manager = None
43+
self._library_manager = None
44+
self._channel_manager = None
45+
self._torrent_state_manager = None
46+
47+
def initialize(self):
48+
sscfg = SessionStartupConfig()
49+
if self._opt.statedir:
50+
sscfg.set_state_dir(unicode(os.path.realpath(self._opt.statedir)))
51+
if self._opt.port:
52+
sscfg.set_dispersy_port(self._opt.port)
53+
if self._opt.nickname:
54+
sscfg.set_nickname(self._opt.nickname)
55+
56+
sscfg.set_megacache(True)
57+
sscfg.set_torrent_collecting(True)
58+
59+
self._logger.info(u"Starting session...")
60+
self._session = Session(sscfg)
61+
# add dispersy start callbacks
62+
#self._session.add_observer(self.init_managers, NTFY_DISPERSY, [NTFY_STARTED])
63+
self._session.add_observer(self.define_communities, NTFY_DISPERSY, [NTFY_STARTED])
64+
self._session.add_observer(self.dispersy_started, NTFY_DISPERSY, [NTFY_STARTED])
65+
self._session.start()
66+
67+
def init_managers(self):
68+
self._logger.info(u"Initializing managers...")
69+
torrent_manager = TorrentManager(None)
70+
library_manager = LibraryManager(None)
71+
channel_manager = ChannelManager()
72+
torrent_manager.connect(self._session, library_manager, channel_manager)
73+
library_manager.connect(self._session, torrent_manager, channel_manager)
74+
channel_manager.connect(self._session, library_manager, torrent_manager)
75+
76+
torrent_state_manager = TorrentStateManager()
77+
torrent_state_manager.connect(torrent_manager, library_manager, channel_manager)
78+
79+
self._torrent_manager = torrent_manager
80+
self._library_manager = library_manager
81+
self._channel_manager = channel_manager
82+
self._torrent_state_manager = torrent_state_manager
83+
84+
def shutdown(self):
85+
self._logger.info(u"Shutting down metadata-injector...")
7986
torrentfeed = RssParser.getInstance()
80-
torrentfeed.register(session, my_channel_id)
81-
torrentfeed.addCallback(my_channel_id, torrent_manager.createMetadataModificationFromDef)
82-
83-
for rss in opt.rss.split(";"):
84-
torrentfeed.addURL(rss, my_channel_id)
85-
86-
if hasattr(opt, 'rss') and opt.rss:
87-
create_torrent_feed()
88-
89-
def create_dir_feed():
90-
logger.info(u"Creating Dir Feed...")
91-
92-
def on_torrent_callback(dirpath, infohash, torrent_data):
93-
torrentdef = TorrentDef.load_from_dict(torrent_data)
94-
channel_manager.createTorrentFromDef(my_channel_id, torrentdef)
95-
96-
# save torrent to collectedtorrents
97-
filename = torrent_manager.getCollectedFilenameFromDef(torrentdef)
98-
if not os.path.isfile(filename):
99-
torrentdef.save(filename)
87+
torrentfeed.shutdown()
10088

10189
dirfeed = DirectoryFeedThread.getInstance()
102-
for dirpath in opt.dir.split(";"):
103-
dirfeed.addDir(dirpath, callback=on_torrent_callback)
104-
105-
if hasattr(opt, 'dir') and opt.dir:
106-
create_dir_feed()
107-
108-
def create_file_feed():
109-
logger.info(u"Creating File Feed...")
110-
community = channel_manager._disp_get_community_from_channel_id(my_channel_id)
111-
112-
logger.info("Using community: %s", community._cid.encode('HEX'))
113-
114-
items = json.load(open(opt.file, 'rb'))
115-
for item in items:
116-
try:
117-
infohash = sha1(item['name']).digest()
118-
except:
119-
infohash = sha1(str(random.randint(0, 1000000))).digest()
120-
message = community._disp_create_torrent(infohash, long(time.time()), unicode(item['name']), ((u'fake.file', 10),), tuple(), update=False, forward=False)
121-
122-
logger.info("Created a new torrent")
123-
124-
latest_review = None
125-
for modification in item['modifications']:
126-
reviewmessage = community._disp_create_modification('description', unicode(modification['text']), long(time.time()), message, latest_review, update=False, forward=False)
127-
128-
logger.info("Created a new modification")
129-
130-
if modification['revert']:
131-
community._disp_create_moderation('reverted', long(time.time()), 0, reviewmessage.packet_id, update=False, forward=False)
132-
133-
logger.info("Reverted the last modification")
134-
else:
135-
latest_review = reviewmessage
136-
137-
if hasattr(opt, 'file') and opt.file and new_channel_created:
138-
create_file_feed()
90+
dirfeed.shutdown()
91+
92+
self._torrent_state_manager.delInstance()
93+
self._channel_manager.delInstance()
94+
self._library_manager.delInstance()
95+
self._torrent_manager.delInstance()
96+
97+
self._session.shutdown()
98+
99+
self._torrent_state_manager = None
100+
self._channel_manager = None
101+
self._library_manager = None
102+
self._torrent_manager = None
103+
self._session = None
104+
105+
@call_on_reactor_thread
106+
def define_communities(self, *args):
107+
from Tribler.community.allchannel.community import AllChannelCommunity
108+
from Tribler.community.channel.community import ChannelCommunity
109+
from Tribler.community.metadata.community import MetadataCommunity
110+
111+
dispersy = self._session.get_dispersy_instance()
112+
dispersy.define_auto_load(AllChannelCommunity,
113+
self._session.dispersy_member,
114+
load=True)
115+
dispersy.define_auto_load(ChannelCommunity,
116+
self._session.dispersy_member,
117+
load=True)
118+
dispersy.define_auto_load(MetadataCommunity,
119+
self._session.dispersy_member,
120+
load=True)
121+
self._logger.info(u"Dispersy communities are ready")
122+
123+
def dispersy_started(self, *args):
124+
self.init_managers()
125+
126+
channelname = self._opt.channelname if hasattr(self._opt, 'chanelname') else ''
127+
nickname = self._opt.nickname if hasattr(self._opt, 'nickname') else ''
128+
my_channel_name = channelname or nickname or 'MetadataInjector-Channel'
129+
my_channel_name = unicode(my_channel_name)
130+
131+
new_channel_created = False
132+
my_channel_id = self._channel_manager.channelcast_db.getMyChannelId()
133+
if not my_channel_id:
134+
self._logger.info(u"Create a new channel")
135+
self._channel_manager.createChannel(my_channel_name, u'')
136+
new_channel_created = True
137+
else:
138+
self._logger.info(u"Use existing channel %s", str(my_channel_id))
139+
my_channel = self._channel_manager.getChannel(my_channel_id)
140+
if my_channel.name != my_channel_name:
141+
self._logger.info(u"Rename channel to %s", my_channel_name)
142+
self._channel_manager.modifyChannel(my_channel_id, {'name': my_channel_name})
143+
my_channel_id = self._channel_manager.channelcast_db.getMyChannelId()
144+
self._logger.info(u"Channel ID [%s]", my_channel_id)
145+
146+
def create_torrent_feed():
147+
self._logger.info(u"Creating RSS Feed...")
148+
149+
torrentfeed = RssParser.getInstance()
150+
torrentfeed.register(self._session, my_channel_id)
151+
torrentfeed.addCallback(my_channel_id, self._torrent_manager.createMetadataModificationFromDef)
152+
153+
for rss in self._opt.rss.split(";"):
154+
torrentfeed.addURL(rss, my_channel_id)
155+
156+
if hasattr(self._opt, 'rss') and self._opt.rss:
157+
create_torrent_feed()
158+
159+
def create_dir_feed():
160+
self._logger.info(u"Creating Dir Feed...")
161+
162+
def on_torrent_callback(dirpath, infohash, torrent_data):
163+
torrentdef = TorrentDef.load_from_dict(torrent_data)
164+
self._channel_manager.createTorrentFromDef(my_channel_id, torrentdef)
165+
166+
# save torrent to collectedtorrents
167+
filename = self._torrent_manager.getCollectedFilenameFromDef(torrentdef)
168+
if not os.path.isfile(filename):
169+
torrentdef.save(filename)
170+
171+
dirfeed = DirectoryFeedThread.getInstance()
172+
for dirpath in self._opt.dir.split(";"):
173+
dirfeed.addDir(dirpath, callback=on_torrent_callback)
174+
175+
if hasattr(self._opt, 'dir') and self._opt.dir:
176+
create_dir_feed()
177+
178+
def create_file_feed():
179+
self._logger.info(u"Creating File Feed...")
180+
community = self._channel_manager._disp_get_community_from_channel_id(my_channel_id)
181+
182+
self._logger.info("Using community: %s", community._cid.encode('HEX'))
183+
184+
items = json.load(open(self._opt.file, 'rb'))
185+
for item in items:
186+
try:
187+
infohash = sha1(item['name']).digest()
188+
except:
189+
infohash = sha1(str(random.randint(0, 1000000))).digest()
190+
message = community._disp_create_torrent(infohash, long(time.time()), unicode(item['name']), ((u'fake.file', 10),), tuple(), update=False, forward=False)
191+
192+
self._logger.info("Created a new torrent")
193+
194+
latest_review = None
195+
for modification in item['modifications']:
196+
reviewmessage = community._disp_create_modification('description', unicode(modification['text']), long(time.time()), message, latest_review, update=False, forward=False)
197+
198+
self._logger.info("Created a new modification")
199+
200+
if modification['revert']:
201+
community._disp_create_moderation('reverted', long(time.time()), 0, reviewmessage.packet_id, update=False, forward=False)
202+
203+
self._logger.info("Reverted the last modification")
204+
else:
205+
latest_review = reviewmessage
206+
207+
if hasattr(self._opt, 'file') and self._opt.file and new_channel_created:
208+
create_file_feed()
139209

140210

141211
def main():
@@ -153,65 +223,24 @@ def main():
153223

154224
if not (opt.rss or opt.dir or opt.file):
155225
command_line_parser.print_help()
156-
logger.info("\nExample: python Tribler/Main/metadata-injector.py --rss http://frayja.com/rss.php --nickname frayja --channelname goldenoldies")
226+
print >> sys.stderr, "\nExample: python Tribler/Main/metadata-injector.py --rss http://frayja.com/rss.php --nickname frayja --channelname goldenoldies"
157227
sys.exit()
158228

159-
logger.info("Type Q followed by <ENTER> to stop the metadata-injector")
160-
161-
sscfg = SessionStartupConfig()
162-
if opt.statedir:
163-
sscfg.set_state_dir(unicode(os.path.realpath(opt.statedir)))
164-
if opt.port:
165-
sscfg.set_dispersy_port(opt.port)
166-
if opt.nickname:
167-
sscfg.set_nickname(opt.nickname)
168-
169-
sscfg.set_megacache(True)
170-
sscfg.set_torrent_collecting(True)
229+
metadata_injector = MetadataInjector(opt)
230+
metadata_injector.initialize()
171231

172-
logger.info("Starting session ...")
173-
session = Session(sscfg)
174-
session.start()
175-
176-
logger.info("Initializing managers ...")
177-
torrent_manager = TorrentManager(None)
178-
library_manager = LibraryManager(None)
179-
channel_manager = ChannelManager()
180-
torrent_manager.connect(session, library_manager, channel_manager)
181-
library_manager.connect(session, torrent_manager, channel_manager)
182-
channel_manager.connect(session, library_manager, torrent_manager)
183-
184-
torrent_state_manager = TorrentStateManager()
185-
torrent_state_manager.connect(torrent_manager, library_manager, channel_manager)
186-
187-
logger.info("Defining communities ...")
188-
reactor.callLater(2, define_communities, session)
189-
reactor.callLater(5, dispersy_started, session, opt, torrent_manager, channel_manager)
232+
print >> sys.stderr, "Type Q followed by <ENTER> to stop the metadata-injector"
190233

191234
# condition variable would be prettier, but that don't listen to
192235
# KeyboardInterrupt
193-
try:
194-
while True:
195-
x = sys.stdin.readline()
196-
logger.info(repr(x))
197-
if x.strip() == 'Q':
198-
break
199-
except:
200-
print_exc()
201-
202-
torrentfeed = RssParser.getInstance()
203-
torrentfeed.shutdown()
204-
205-
dirfeed = DirectoryFeedThread.getInstance()
206-
dirfeed.shutdown()
207-
208-
torrent_state_manager.delInstance()
209-
channel_manager.delInstance()
210-
library_manager.delInstance()
211-
torrent_manager.delInstance()
212-
213-
session.shutdown()
214-
logger.info("Shutting down...")
236+
import signal
237+
signal.signal(signal.SIGTERM, stop_reactor)
238+
signal.signal(signal.SIGQUIT, stop_reactor)
239+
reactor.addSystemEventTrigger('before', 'shutdown', metadata_injector.shutdown)
240+
241+
reactor_thread.join()
242+
243+
print >> sys.stderr, "Shutting down..."
215244
time.sleep(5)
216245

217246

0 commit comments

Comments
 (0)