Source code for rudra.data_store.bigquery.pypi_bigquery

"""Pypi bigquery implementation."""
import time
import os
from collections import Counter

from rudra.data_store.bigquery.base import BigqueryBuilder
from rudra.utils.pypi_parser import pip_req
from rudra.data_store.bigquery.base import DataProcessing
from rudra.utils.validation import BQValidation
from rudra import logger


[docs]class PyPiBigQuery(BigqueryBuilder): """PyPiBigQuery Implementation.""" def __init__(self, *args, **kwargs): """Initialize PyPiBigQuery object.""" super().__init__(*args, **kwargs) self.query_job_config.use_legacy_sql = False self.query_job_config.use_query_cache = True self.query_job_config.timeout_ms = 60000 self.query = """ SELECT con.content AS content FROM `bigquery-public-data.github_repos.contents` AS con INNER JOIN (SELECT files.id AS id FROM `bigquery-public-data.github_repos.languages` AS langs INNER JOIN `bigquery-public-data.github_repos.files` AS files ON files.repo_name = langs.repo_name WHERE REGEXP_CONTAINS(TO_JSON_STRING(language), r'(?i)python') AND files.path LIKE '%requirements.txt' ) AS L ON con.id = L.id; """
[docs]class PyPiBigQueryDataProcessing(DataProcessing): """Implementation data processing for pypi bigquery.""" def __init__(self, big_query_instance=None, s3_client=None, file_name='collated.json'): """Initialize the BigQueryDataProcessing object.""" super().__init__(s3_client) self.big_query_instance = big_query_instance or PyPiBigQuery() self.big_query_content = list() self.counter = Counter() self.bucket_name = self.s3_client.bucket_name \ if self.s3_client else'developer-analytics-audit-report' self.filename = '{}/big-query-data/{}'.format( os.getenv('DEPLOYMENT_PREFIX', 'dev'), file_name)
[docs] def process(self, validate=False): """Process Pypi Bigquery response data.""" bq_validation = BQValidation() logger.info("Running Bigquery for pypi synchronously") self.big_query_instance.run_query_sync() start_process_time = time.monotonic() for idx, obj in enumerate(self.big_query_instance.get_result()): start = time.monotonic() content = obj.get('content') packages = [] if content: try: packages = sorted( {p for p in pip_req.parse_requirements(content)}) if validate: packages = sorted( bq_validation.validate_pypi(packages)) except Exception as _exc: logger.error("IGNORE: {}".format(_exc)) logger.error( "Failed to parse content data {}".format(content)) if packages: pkg_string = ', '.join(packages) logger.info("PACKAGES: {}".format(pkg_string)) self.counter.update([pkg_string]) logger.info("Processed content in time: {} counter:{}".format( (time.monotonic() - start), idx)) logger.info("Processed All the manifests in time: {}".format( time.monotonic() - start_process_time)) logger.info("updating file content") self.update_s3_bucket(data={'pypi': dict(self.counter.most_common())}, bucket_name=self.bucket_name, filename=self.filename) logger.info("Succefully Processed the PyPiBigQuery")