New processing, state in the DB
This commit is contained in:
parent
84c860d591
commit
2ed1c009a7
9 changed files with 88 additions and 92 deletions
|
@ -6,7 +6,12 @@ from playhouse.db_url import connect
|
|||
database = connect(config.DATABASE_URI)
|
||||
|
||||
|
||||
class Change(Model):
|
||||
class BaseModel(Model):
|
||||
class Meta:
|
||||
database = database
|
||||
|
||||
|
||||
class Change(BaseModel):
|
||||
"""A model for the change. Just a single table."""
|
||||
changeset = IntegerField()
|
||||
user = CharField(max_length=250, index=True)
|
||||
|
@ -51,24 +56,20 @@ class Change(Model):
|
|||
tags[t].append('nothing')
|
||||
return tags
|
||||
|
||||
class Meta:
|
||||
database = database
|
||||
|
||||
|
||||
class Seen(Model):
|
||||
class Seen(BaseModel):
|
||||
"""A model for a storage of processed objects."""
|
||||
obj = TextField(index=True)
|
||||
|
||||
class Meta:
|
||||
database = database
|
||||
|
||||
|
||||
class User(Model):
|
||||
class User(BaseModel):
|
||||
"""A model for user stats."""
|
||||
user = CharField(max_length=250, unique=True)
|
||||
edits = IntegerField()
|
||||
rank = IntegerField(default=0)
|
||||
joined = DateField()
|
||||
|
||||
class Meta:
|
||||
database = database
|
||||
|
||||
class State(BaseModel):
|
||||
"""A model for storing replication state."""
|
||||
state = IntegerField()
|
||||
|
|
7
mmwatch/process.py
Executable file
7
mmwatch/process.py
Executable file
|
@ -0,0 +1,7 @@
|
|||
#!/usr/bin/env python
|
||||
from server import mapsme_process
|
||||
from server import parse_notes
|
||||
print 'Processing changes'
|
||||
mapsme_process.process()
|
||||
print 'Processing notes'
|
||||
parse_notes.process_notes()
|
0
mmwatch/server/__init__.py
Normal file
0
mmwatch/server/__init__.py
Normal file
46
mmwatch/server/mapsme-process.py → mmwatch/server/mapsme_process.py
Executable file → Normal file
46
mmwatch/server/mapsme-process.py → mmwatch/server/mapsme_process.py
Executable file → Normal file
|
@ -1,11 +1,10 @@
|
|||
#!/usr/bin/env python
|
||||
import sys, os, urllib2, re, gzip, json
|
||||
import urllib2, re, gzip, json
|
||||
from db import *
|
||||
from lxml import etree
|
||||
from StringIO import StringIO
|
||||
from datetime import datetime, date
|
||||
|
||||
STATE_FILENAME = os.path.join(path, 'mapsme-state.txt')
|
||||
REPLICATION_BASE_URL = 'http://planet.openstreetmap.org/replication/changesets'
|
||||
API_ENDPOINT = 'https://api.openstreetmap.org/api/0.6'
|
||||
MAIN_TAGS = ('amenity', 'shop', 'tourism', 'historic', 'craft', 'office', 'emergency', 'barrier',
|
||||
|
@ -21,22 +20,6 @@ def download_last_state():
|
|||
return int(m.group(1))
|
||||
|
||||
|
||||
def read_last_state():
|
||||
state = None
|
||||
try:
|
||||
with open(STATE_FILENAME, 'r') as f:
|
||||
m = re.search(r'\d+', f.read())
|
||||
state = int(m.group(0))
|
||||
except:
|
||||
pass
|
||||
return state
|
||||
|
||||
|
||||
def write_last_state(state):
|
||||
with open(STATE_FILENAME, 'w') as f:
|
||||
f.write(str(state))
|
||||
|
||||
|
||||
def filter_changeset(changeset):
|
||||
"""A changeset object is a dict of tags plus 'id', 'timestamp' and 'user' fields."""
|
||||
return 'created_by' in changeset and 'maps.me' in changeset['created_by'].lower()
|
||||
|
@ -254,21 +237,25 @@ def update_user_ranks():
|
|||
count += 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
def process():
|
||||
print 'downloading'
|
||||
try:
|
||||
cur_state = download_last_state()
|
||||
except Exception as e:
|
||||
print 'Failed to download last state:', e
|
||||
sys.exit(1)
|
||||
|
||||
state = read_last_state()
|
||||
if state is None:
|
||||
state = cur_state - 1
|
||||
return
|
||||
|
||||
print 'connecting'
|
||||
database.connect()
|
||||
database.create_tables([Change, Seen, User], safe=True)
|
||||
database.create_tables([Change, Seen, User, State], safe=True)
|
||||
|
||||
for i in range(state + 1, cur_state + 1):
|
||||
try:
|
||||
state = State.get(id=1)
|
||||
except State.DoesNotExist:
|
||||
state = State()
|
||||
state.state = cur_state - 1
|
||||
|
||||
for i in range(state.state + 1, cur_state + 1):
|
||||
print i
|
||||
try:
|
||||
changesets = download_replication(i)
|
||||
|
@ -278,5 +265,10 @@ if __name__ == '__main__':
|
|||
except Exception as e:
|
||||
print 'Failed to download and process replication {0}: {1}'.format(i, e)
|
||||
raise e
|
||||
write_last_state(i)
|
||||
state.state = i
|
||||
state.save()
|
||||
update_user_ranks()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
process()
|
|
@ -1,54 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
import sys, os, bz2, urllib2, json
|
||||
from bz2file import BZ2File
|
||||
from db import *
|
||||
from lxml import etree
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def process_notes():
|
||||
response = urllib2.urlopen('http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2')
|
||||
# Parsing bz2 through a temporary file
|
||||
tmpfile = os.path.join(os.path.dirname(sys.argv[0]), 'parse-notes.tmp.osn.bz2')
|
||||
with open(tmpfile, 'wb') as f:
|
||||
while True:
|
||||
chunk = response.read(512*1024)
|
||||
if not chunk:
|
||||
break
|
||||
f.write(chunk)
|
||||
|
||||
print 'Parsing notes'
|
||||
try:
|
||||
# In case of an error remove the temporary file
|
||||
database.connect()
|
||||
with database.atomic():
|
||||
with BZ2File(tmpfile) as f:
|
||||
for event, element in etree.iterparse(f):
|
||||
if element.tag == 'note':
|
||||
if len(element) > 0 and '#mapsme' in element[0].text:
|
||||
note_id = element.get('id')
|
||||
try:
|
||||
ch = Change.get(Change.changeset == note_id, Change.action == 'n')
|
||||
if element[-1].get('action') == 'closed':
|
||||
# TODO: mark as closed
|
||||
last_ts = element[-1].get('timestamp')
|
||||
except Change.DoesNotExist:
|
||||
print 'Found new note', note_id, 'by', element[0].get('user')
|
||||
# No such note, add it
|
||||
ch = Change()
|
||||
ch.changeset = note_id
|
||||
ch.user = element[0].get('user') if element[0].get('uid') else 'Anonymous'
|
||||
ch.version = ''
|
||||
ch.timestamp = element[0].get('timestamp')
|
||||
ch.action = 'n'
|
||||
changes = [(element.get('lon'), element.get('lat')), {'name': element[0].text}]
|
||||
ch.changes = json.dumps(changes)
|
||||
ch.save()
|
||||
element.clear()
|
||||
finally:
|
||||
os.remove(tmpfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print 'Downloading notes'
|
||||
process_notes()
|
49
mmwatch/server/parse_notes.py
Normal file
49
mmwatch/server/parse_notes.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
#!/usr/bin/env python
|
||||
import bz2, urllib2, json
|
||||
from tempfile import TemporaryFile
|
||||
from bz2file import BZ2File
|
||||
from db import *
|
||||
from lxml import etree
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def process_notes():
|
||||
response = urllib2.urlopen('http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2')
|
||||
# Parsing bz2 through a temporary file
|
||||
tmpfile = TemporaryFile()
|
||||
while True:
|
||||
chunk = response.read(512*1024)
|
||||
if not chunk:
|
||||
break
|
||||
tmpfile.write(chunk)
|
||||
tmpfile.seek(0)
|
||||
|
||||
database.connect()
|
||||
with database.atomic():
|
||||
with BZ2File(tmpfile) as f:
|
||||
for event, element in etree.iterparse(f):
|
||||
if element.tag == 'note':
|
||||
if len(element) > 0 and '#mapsme' in element[0].text:
|
||||
note_id = element.get('id')
|
||||
try:
|
||||
ch = Change.get(Change.changeset == note_id, Change.action == 'n')
|
||||
if element[-1].get('action') == 'closed':
|
||||
# TODO: mark as closed
|
||||
last_ts = element[-1].get('timestamp')
|
||||
except Change.DoesNotExist:
|
||||
print 'Found new note', note_id, 'by', element[0].get('user')
|
||||
# No such note, add it
|
||||
ch = Change()
|
||||
ch.changeset = note_id
|
||||
ch.user = element[0].get('user') if element[0].get('uid') else 'Anonymous'
|
||||
ch.version = ''
|
||||
ch.timestamp = element[0].get('timestamp')
|
||||
ch.action = 'n'
|
||||
changes = [(element.get('lon'), element.get('lat')), {'name': element[0].text}]
|
||||
ch.changes = json.dumps(changes)
|
||||
ch.save()
|
||||
element.clear()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
process_notes()
|
0
mmwatch/www/__init__.py
Normal file
0
mmwatch/www/__init__.py
Normal file
|
@ -2,3 +2,4 @@ peewee>=2.8.0
|
|||
lxml
|
||||
flask>=0.11
|
||||
flask-Compress
|
||||
bz2file
|
||||
|
|
Loading…
Add table
Reference in a new issue