From eaeb5d0580be3d234175cdef971d3f7421559a3b Mon Sep 17 00:00:00 2001 From: Ilya Zverev Date: Tue, 14 Jun 2016 17:21:48 +0300 Subject: [PATCH] Add notes processing to the regular pipeline --- mmwatch/db.py | 13 ++++++++++++- mmwatch/process.py | 11 +++++++++++ mmwatch/server/parse_notes.py | 29 +++++++++++++++++++++++++---- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/mmwatch/db.py b/mmwatch/db.py index 8e345b7..aa355f0 100644 --- a/mmwatch/db.py +++ b/mmwatch/db.py @@ -1,5 +1,6 @@ import config import json +import time from peewee import * from playhouse.db_url import connect @@ -72,4 +73,14 @@ class User(BaseModel): class State(BaseModel): """A model for storing replication state.""" - state = IntegerField() + state = IntegerField() # Replication state + notes = IntegerField() # File size of notes dump + hourly = IntegerField() # Unix Timestamp of last object check + + def run_hourly(self): + """Returns True if it's time to run hourly tasks.""" + timestamp = time.time() + return self.hourly + 3600 < timestamp + + def update_hourly(self): + self.hourly = int(time.time()) diff --git a/mmwatch/process.py b/mmwatch/process.py index 7bb8c85..8d16a69 100755 --- a/mmwatch/process.py +++ b/mmwatch/process.py @@ -9,3 +9,14 @@ if os.path.exists(VENV_DIR): from server import mapsme_process mapsme_process.process() + +from db import State +from server import parse_notes +# By this time connection is already established by mapsme_process. +st = State.get(State.id == 1) +if st.run_hourly(): + parse_notes.process_notes() + # State could have been modified. + st = State.get(State.id == 1) + st.update_hourly() + st.save() diff --git a/mmwatch/server/parse_notes.py b/mmwatch/server/parse_notes.py index 5306a6c..4210c85 100644 --- a/mmwatch/server/parse_notes.py +++ b/mmwatch/server/parse_notes.py @@ -3,10 +3,29 @@ import urllib2 import json from tempfile import TemporaryFile from bz2file import BZ2File -from db import database, Change +from db import database, Change, State from lxml import etree from datetime import datetime +NOTES_URI = 'http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2' + + +def check_update(): + request = urllib2.Request(NOTES_URI) + request.get_method = lambda: 'HEAD' + try: + response = urllib2.urlopen(request) + length = int(response.info()['Content-Length']) + if length > 0: + st = State.get(State.id == 1) + if length != st.notes: + st.notes = length + st.save() + return True + except: + pass + return False + def hour_difference(start, timestamp): last_ts = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') @@ -15,7 +34,11 @@ def hour_difference(start, timestamp): def process_notes(): - response = urllib2.urlopen('http://planet.openstreetmap.org/notes/planet-notes-latest.osn.bz2') + database.connect() + if not check_update(): + return + + response = urllib2.urlopen(NOTES_URI) # Parsing bz2 through a temporary file tmpfile = TemporaryFile() while True: @@ -25,7 +48,6 @@ def process_notes(): tmpfile.write(chunk) tmpfile.seek(0) - database.connect() with database.atomic(): with BZ2File(tmpfile) as f: for event, element in etree.iterparse(f): @@ -53,6 +75,5 @@ def process_notes(): ch.save() element.clear() - if __name__ == '__main__': process_notes()