"""Demo script for downloading data through the Data Portability API. Gets an OAuth token, initiates a portability archive, polls status, gets signed URLs, downloads archive, and then resets authorization. Usage: python3 data-portability-quickstart.py """ from collections.abc import Sequence import io import os import time from typing import Generator import urllib import zipfile from google.oauth2 import credentials import google_auth_oauthlib.flow from googleapiclient import discovery import googleapiclient.errors # The name of a file that contains the OAuth 2.0 information for this # application, including the client_id and client_secret. For this script, this # should be a desktop application OAuth client. CLIENT_SECRETS_FILE = 'client_secrets.json' # A list of Data Portability resources that we want to request. RESOURCES = ['myactivity.search', 'myactivity.youtube'] DATAPORTABILITY_API_SERVICE_NAME = 'dataportability' API_VERSION = 'v1' # There is a one to one mapping between Data Portability resources and # dataportability OAuth scopes. The scope code is the resource name plus a # prefix. SCOPE_PREFIX = 'https://www.googleapis.com/auth/dataportability.' def get_credentials( resources: Sequence[str], ) -> (credentials.Credentials, Sequence[str]): """Gets OAuth 2.0 credentials using an installed app OAuth flow. This generates a link for the user to consent to some or all of the requested resources. In a production environment, the best practice is to save a refresh token in Cloud Storage because the access token can expire before the portability archive job completes. Args: resources: A list of dataportability resource IDs. These are OAuth scope codes from https://developers.google.com/data-portability/reference/rest/v1/portabilityArchive/initiate#authorization-scopes without the 'https://www.googleapis.com/auth/dataportability.' prefix. Returns: A tuple of credentials containing an access token and a list of resources for which the user has granted consent. """ flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( CLIENT_SECRETS_FILE, [SCOPE_PREFIX + r for r in resources], ) try: return flow.run_local_server(), resources except Warning as warn: # We should gracefully handle the user only consenting to a subset of the # requested scopes. return credentials.Credentials(warn.token['access_token']), [ scope.removeprefix(SCOPE_PREFIX) for scope in warn.new_scope ] def get_api_interface( creds: credentials.Credentials, ) -> discovery.Resource: """Gets an interface to the Data Portability API.""" return discovery.build( serviceName=DATAPORTABILITY_API_SERVICE_NAME, version=API_VERSION, credentials=creds, ) def initiate_portability_archive( dataportability: discovery.Resource, resources: Sequence[str] ) -> str: """Initiates a portability archive for the requesteed resources.""" initiate = dataportability.portabilityArchive().initiate( body={'resources': [resources]} ) print('\n', initiate.method, initiate.body, initiate.uri, '\n') initiate_response = initiate.execute() print(initiate_response, '\n') return initiate_response['archiveJobId'] def exponential_backoff( delay: float, max_delay: float, multiplier: float ) -> Generator[None, None, None]: while True: time.sleep(delay) yield delay = min(delay * multiplier, max_delay) def poll_get_portability_archive_state( dataportability: discovery.Resource, job_id: str ) -> Sequence[str]: """Calls dataportability's getPortabilityArchiveState endpoint.""" get_state = dataportability.archiveJobs().getPortabilityArchiveState( name='archiveJobs/{}/portabilityArchiveState'.format(job_id) ) print( 'Polling archive status while server indicates state is in progress...\n', get_state.method, get_state.uri, ) for _ in exponential_backoff(3, 3600, 1.5): state = get_state.execute() print(state) if state['state'] != 'IN_PROGRESS': return state['urls'] def reset_authorization(dataportability: discovery.Resource) -> None: """Calls dataportability's reset endpoint.""" reset = dataportability.authorization().reset() print('\n', reset.method, reset.uri, '\n') initiate_response = reset.execute() print(initiate_response, '\n') def main() -> None: # When running locally, disable OAuthlib's HTTPs verification. When # running in production *do not* leave this option enabled. os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' creds, resources = get_credentials(RESOURCES) print('\nObtained OAuth credentials for resources: ', ', '.join(resources)) dataportability = get_api_interface(creds) try: job_id = initiate_portability_archive(dataportability, resources) print('Successfully initiated data archive job with ID', job_id, '\n') urls = poll_get_portability_archive_state(dataportability, job_id) for url in urls: print('\nData archive is ready. Beginning download.') ufile = urllib.request.urlopen(url) print('Download complete! Extracting archive...\n') zf = zipfile.ZipFile(io.BytesIO(ufile.read()), 'r') for f in zf.filelist: print(f) # Save extracted files in the current directory. zf.extractall() except googleapiclient.errors.HttpError as e: print(e) finally: # If data retrieval fails, call reset in case any resources are exhausted. reset_authorization(dataportability) if __name__ == '__main__': main()