Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions changesetmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,28 @@ def createTables(self, connection):
cursor.execute(queries.createGeometryColumn)
connection.commit()

def insertNew(self, connection, id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, comments):
def insertNewBatch(self, connection, data_arr):
cursor = connection.cursor()
if self.createGeometry:
cursor.execute('''INSERT into osm_changeset
sql = '''INSERT into osm_changeset
(id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags, geom)
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))''',
(id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags, minLon, minLat, maxLon, maxLat))
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,ST_SetSRID(ST_MakeEnvelope(%s,%s,%s,%s), 4326))'''
psycopg2.extras.execute_batch(cursor, sql, data_arr)
cursor.close()
else:
cursor.execute('''INSERT into osm_changeset
sql = '''INSERT into osm_changeset
(id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, open, num_changes, user_name, tags)
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
(id, userId, createdAt, minLat, maxLat, minLon, maxLon, closedAt, open, numChanges, userName, tags))
for comment in comments:
cursor.execute('''INSERT into osm_changeset_comment
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
psycopg2.extras.execute_batch(cursor, sql, data_arr)
cursor.close()

def insertNewBatchComment(self, connection, comment_arr):
cursor=connection.cursor()
sql = '''INSERT into osm_changeset_comment
(comment_changeset_id, comment_user_id, comment_user_name, comment_date, comment_text)
values (%s,%s,%s,%s,%s)''',
(id, comment['uid'], comment['user'], comment['date'], comment['text']))
values (%s,%s,%s,%s,%s)'''
psycopg2.extras.execute_batch(cursor, sql, comment_arr)
cursor.close()

def deleteExisting(self, connection, id):
cursor = connection.cursor()
Expand All @@ -81,6 +86,8 @@ def parseFile(self, connection, changesetFile, doReplication):
cursor = connection.cursor()
context = etree.iterparse(changesetFile)
action, root = context.next()
changesets = []
comments = []
for action, elem in context:
if(elem.tag != 'changeset'):
continue
Expand All @@ -91,35 +98,41 @@ def parseFile(self, connection, changesetFile, doReplication):
for tag in elem.iterchildren(tag='tag'):
tags[tag.attrib['k']] = tag.attrib['v']

comments = []
for discussion in elem.iterchildren(tag='discussion'):
for commentElement in discussion.iterchildren(tag='comment'):
comment = dict()
comment['uid'] = commentElement.attrib.get('uid')
comment['user'] = commentElement.attrib.get('user')
comment['date'] = commentElement.attrib.get('date')
for text in commentElement.iterchildren(tag='text'):
comment['text'] = text.text
text = text.text
comment = (elem.attrib['id'], commentElement.attrib.get('uid'), commentElement.attrib.get('user'), commentElement.attrib.get('date'), text)
comments.append(comment)

if(doReplication):
self.deleteExisting(connection, elem.attrib['id'])

self.insertNew(connection, elem.attrib['id'], elem.attrib.get('uid', None),
elem.attrib['created_at'], elem.attrib.get('min_lat', None),
elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None),
elem.attrib.get('max_lon', None),elem.attrib.get('closed_at', None),
elem.attrib.get('open', None), elem.attrib.get('num_changes', None),
elem.attrib.get('user', None), tags, comments)

if((parsedCount % 10000) == 0):
if self.createGeometry:
changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None),
elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None),
elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags,elem.attrib.get('min_lon', None), elem.attrib.get('min_lat', None),
elem.attrib.get('max_lon', None), elem.attrib.get('max_lat', None)))
else:
changesets.append((elem.attrib['id'], elem.attrib.get('uid', None), elem.attrib['created_at'], elem.attrib.get('min_lat', None),
elem.attrib.get('max_lat', None), elem.attrib.get('min_lon', None), elem.attrib.get('max_lon', None), elem.attrib.get('closed_at', None),
elem.attrib.get('open', None), elem.attrib.get('num_changes', None), elem.attrib.get('user', None), tags))

if((parsedCount % 100000) == 0):
self.insertNewBatch(connection, changesets)
self.insertNewBatchComment(connection, comments )
changesets = []
comments = []
print "parsed %s" % ('{:,}'.format(parsedCount))
print "cumulative rate: %s/sec" % '{:,.0f}'.format(parsedCount/timedelta.total_seconds(datetime.now() - startTime))

#clear everything we don't need from memory to avoid leaking
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
# Update whatever is left, then commit
self.insertNewBatch(connection, changesets)
self.insertNewBatchComment(connection, comments)
connection.commit()
print "parsing complete"
print "parsed {:,}".format(parsedCount)
Expand Down