Add notes processing to the regular pipeline

This commit is contained in:
Ilya Zverev 2016-06-14 17:21:48 +03:00
parent d606b4e099
commit eaeb5d0580
3 changed files with 48 additions and 5 deletions

View file

@ -1,5 +1,6 @@
import config
import json
import time
from peewee import *
from playhouse.db_url import connect
@ -72,4 +73,14 @@ class User(BaseModel):
class State(BaseModel):
"""A model for storing replication state."""
state = IntegerField()
state = IntegerField() # Replication state
notes = IntegerField() # File size of notes dump
hourly = IntegerField() # Unix Timestamp of last object check
def run_hourly(self):
"""Returns True if it's time to run hourly tasks."""
timestamp = time.time()
return self.hourly + 3600 < timestamp
def update_hourly(self):
self.hourly = int(time.time())

View file

@ -9,3 +9,14 @@ if os.path.exists(VENV_DIR):
from server import mapsme_process
mapsme_process.process()
from db import State
from server import parse_notes
# By this time connection is already established by mapsme_process.
st = State.get(State.id == 1)
if st.run_hourly():
parse_notes.process_notes()
# State could have been modified.
st = State.get(State.id == 1)
st.update_hourly()
st.save()

View file

@ -3,10 +3,29 @@ import urllib2
import json
from tempfile import TemporaryFile
from bz2file import BZ2File
from db import database, Change
from db import database, Change, State
from lxml import etree
from datetime import datetime
NOTES_URI = 'http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2'
def check_update():
request = urllib2.Request(NOTES_URI)
request.get_method = lambda: 'HEAD'
try:
response = urllib2.urlopen(request)
length = int(response.info()['Content-Length'])
if length > 0:
st = State.get(State.id == 1)
if length != st.notes:
st.notes = length
st.save()
return True
except:
pass
return False
def hour_difference(start, timestamp):
last_ts = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
@ -15,7 +34,11 @@ def hour_difference(start, timestamp):
def process_notes():
response = urllib2.urlopen('http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2')
database.connect()
if not check_update():
return
response = urllib2.urlopen(NOTES_URI)
# Parsing bz2 through a temporary file
tmpfile = TemporaryFile()
while True:
@ -25,7 +48,6 @@ def process_notes():
tmpfile.write(chunk)
tmpfile.seek(0)
database.connect()
with database.atomic():
with BZ2File(tmpfile) as f:
for event, element in etree.iterparse(f):
@ -53,6 +75,5 @@ def process_notes():
ch.save()
element.clear()
if __name__ == '__main__':
process_notes()