diff --git a/tools/python/oauth2-flow/oauth2_flow.py b/tools/python/oauth2-flow/oauth2_flow.py new file mode 100644 index 0000000000..e05b819578 --- /dev/null +++ b/tools/python/oauth2-flow/oauth2_flow.py @@ -0,0 +1,226 @@ +import http.client +import logging +import re +import urllib.parse +from http.cookies import SimpleCookie + + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)-5s] %(message)s') + +# Prod enviroment +""" +osmHost = "www.openstreetmap.org" +OAuthClientId = "nw9bW3nZ-q99SXzgnH-dlED3ueDSmFPtxl33n3hDwFU" +clientSecret = "nIxwFx1NXIx9lKoNmb7lAoHd9ariGMf46PtU_YG558c" +redirectUri = "om://oauth2/osm/callback" +response_type = "code" +scope = "read_prefs" +""" + +# Dev enviroment +osmHost = "master.apis.dev.openstreetmap.org" +OAuthClientId = "QSh_IksPC78W7EgRxSRDw_F8oulQghpL2QcTLHrP4z8" +OAuthClientSecret = "NMU8BNkUGg_R0KhGtrrSJRREBbMBM2133Gi-cZWmVmc" +OAuthRedirectUri = "om://oauth2/osm/callback" +OAuthResponseType = "code" +OAuthScope = "read_prefs write_api write_notes" +test_login = "OrganicMapsTestUser" +test_password = "12345678" + +def FetchSessionId(): + logging.info("Getting initial cookie to login ...") + + conn = http.client.HTTPSConnection(osmHost) + conn.request("GET", "/login?cookie_test=true") + res = conn.getresponse() + + cookies = SimpleCookie() + cookies.load(res.headers['Set-Cookie']) + + authToken = FindAuthenticityToken(res.read().decode("utf-8")) + + if authToken is None: + raise Exception("Can't find 'authenticity_token'") + + logging.info(f"Parsed authToken = {authToken}") + logging.info("Got cookies: %s",res.headers['Set-Cookie']) + + return cookies, authToken + +def LoginUserPassword(username, password, cookies, authToken): + logging.info("Logging in with username and password ...") + + # Do POST with credentials + payload = { + "authenticity_token": authToken, + "username": username, + "password": password, + "remember_me": "yes" + } + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Cookie": encodeCookies(cookies), + "Origin": f"https://{osmHost}", + "Referer": f"https://{osmHost}/login?cookie_test=true" + } + + conn2 = http.client.HTTPSConnection(osmHost) + conn2.request("POST", "/login?cookie_test=true", urllib.parse.urlencode(payload), headers) + res = conn2.getresponse() + + if res.status >= 400: + print(res.status) + print(res.headers) + print(res.read().decode('utf-8')) + raise Exception("Invalid login or password") + + logging.info("Logged in successfully") + + return cookies + + +def FetchRequestToken(cookies): + logging.info("Loading OAuth2 auth page ...") + + query = urllib.parse.urlencode({ + "client_id": OAuthClientId, + "redirect_uri": OAuthRedirectUri, + "response_type": OAuthResponseType, + "scope": OAuthScope + }) + + headers = { + "Cookie": encodeCookies(cookies), + } + + conn = http.client.HTTPSConnection(osmHost) + conn.request("GET", f"/oauth2/authorize?{query}", headers=headers) + + res = conn.getresponse() + + if res.status >= 400: + print(res.status) + print(res.headers) + print(res.read().decode('utf-8')) + raise Exception("Can't load OAuth2 page") + + elif res.status == 302: + logging.info("User already accepted OAuth2 request!") + redirectUri = res.headers['Location'] + logging.info(f"Accepted OAuth2: {redirectUri}") + oauthCode = FindOauthCode(redirectUri) + + if not oauthCode: + raise Exception(f"Can't find 'code' in redirect URI: {redirectUri}") + + return oauthCode + else: + respBody = res.read().decode("utf-8") + authToken = FindAuthenticityToken(respBody) + if not authToken: + print(res.status) + print(res.headers) + print(respBody) + raise Exception("Invalid authToken '{authToken}'") + logging.info(f"Parsed authToken = {authToken}") + + return SendAuthRequest(authToken, cookies) + +def SendAuthRequest(authToken, cookies): + logging.info("Accepting OAuth2 ...") + # Do POST + payload = { + "authenticity_token": authToken, + "client_id": OAuthClientId, + "redirect_uri": OAuthRedirectUri, + "response_type": OAuthResponseType, + "scope": OAuthScope + } + + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Cookie": encodeCookies(cookies), + "Origin": f"https://{osmHost}", + } + + conn2 = http.client.HTTPSConnection(osmHost) + conn2.request("POST", f"/oauth2/authorize", urllib.parse.urlencode(payload), headers=headers) + + res = conn2.getresponse() + if res.status != 302: + print(res.status) + print(res.headers) + print(res.read().decode('utf-8')) + raise Exception("Invalid response. Expected redirect") + + redirectUri = res.headers['Location'] + logging.info(f"Accepted OAuth2: {redirectUri}") + oauthCode = FindOauthCode(redirectUri) + + if not oauthCode: + raise Exception(f"Can't find 'code' in redirect URI: {redirectUri}") + + return oauthCode + +def FinishAuthorization(code): + payload = urllib.parse.urlencode({ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": OAuthRedirectUri, + "scope": OAuthScope, + "client_id": OAuthClientId, + "client_secret": OAuthClientSecret + }) + + headers = { 'content-type': "application/x-www-form-urlencoded" } + + conn = http.client.HTTPSConnection(osmHost) + conn.request("POST", "/oauth2/token", payload, headers) + + res = conn.getresponse() + if res.status >= 400: + print(res.status) + print(res.headers) + print(res.read().decode('utf-8')) + raise Exception("Error getting OAuth2 token") + + return res.read().decode("utf-8") + + +def FindAuthenticityToken(htmlCode): + # search for + regex = re.compile(r"\