Source code for watershed

"""
Virtual Watershed Adaptor. Handles fetching and searching of data, model
run initialization, and pushing of data. Does this for associated metadata
as well. Each file that's either taken as input or produced as output gets
associated metadata.
"""

import configparser
from datetime import datetime
import logging
import json
import os
import requests
import urllib

from string import Template


[docs]def makeFGDCMetadata(dataFile, config, modelRunUUID): """ For a single `dataFile`, write the XML FGDC metadata Returns: XML metadata string """ try: statinfo = os.stat(dataFile) filesizeMB = "%s" % str(statinfo.st_size/1000000) except OSError: filesizeMB = "NA" fgdcConfig = config['FGDC Metadata'] commonConfig = config['Common'] # use templates and the fgdc configuration to write the metadata for a file xml_template = fgdcConfig['template_path'] template_object = open(xml_template, 'r') template = Template(template_object.read()) output = template.substitute(filename=dataFile, filesizeMB=filesizeMB, model_run_uuid=modelRunUUID, procdate=fgdcConfig['procdate'], begdate=fgdcConfig['begdate'], enddate=fgdcConfig['enddate'], westBnd=commonConfig['westBnd'], eastBnd=commonConfig['eastBnd'], northBnd=commonConfig['northBnd'], southBnd=commonConfig['southBnd'], themekey=fgdcConfig['themekey'], model=commonConfig['model'], researcherName=fgdcConfig['researcherName'], mailing_address=fgdcConfig['mailing_address'], city=fgdcConfig['city'], state=fgdcConfig['state'], zipCode=fgdcConfig['zipCode'], researcherPhone=fgdcConfig['researcherPhone'], researcherEmail=fgdcConfig['researcherEmail'], rowcount=fgdcConfig['rowcount'], columncount=fgdcConfig['columncount'], latres=fgdcConfig['latres'], longres=fgdcConfig['longres'], mapUnits=fgdcConfig['mapUnits']) return output
[docs]def makeWatershedMetadata(dataFile, config, parentModelRunUUID, modelRunUUID, model_set, description="", model_vars="", fgdcMetadata="", start_datetime=None, end_datetime=None): """ For a single `dataFile`, write the corresponding Virtual Watershed JSON metadata. Take the modelRunUUID from the result of initializing a new model run in the virtual watershed. model_set must be Returns: JSON metadata string """ assert model_set in ["inputs", "outputs"], "parameter model_set must be \ either 'inputs' or 'outputs', not %s" % model_set RECS = "1" FEATURES = "1" # logic to figure out mimetype and such based on extension _, ext = os.path.splitext(dataFile) if ext == '.tif': wcs = 'wcs' wms = 'wms' tax = 'geoimage' ext = 'tif' mimetype = 'application/x-zip-compressed' # type_subdir = 'geotiffs' model_set_type = 'vis' else: wcs = '' wms = '' tax = 'file' ext = 'bin' mimetype = 'application/x-binary' # type_subdir = 'bin' model_set_type = 'binary' basename = os.path.basename(dataFile) watershedConfig = config['Watershed Metadata'] commonConfig = config['Common'] firstTwoParentUUID = parentModelRunUUID[:2] inputFilePath = os.path.join("/geodata/watershed-data", firstTwoParentUUID, parentModelRunUUID, os.path.basename(dataFile)) json_template = watershedConfig['template_path'] template_object = open(json_template, 'r') template = Template(template_object.read()) # properly escape xml metadata escape chars fgdcMetadata = fgdcMetadata.replace('\n','\\n').replace('\t','\\t') # If one of the datetimes is missing if start_datetime is None or end_datetime is None: start_datetime = "1970-10-01 00:00:00" end_datetime = "1970-10-01 01:00:00" elif type(start_datetime) is datetime and type(end_datetime) is datetime: fmt = lambda dt: dt.strftime('%Y-%m-%d %H:%M:%S') start_datetime, end_datetime = map(fmt, (start_datetime, end_datetime)) else: raise Exception("Either pass no start/end datetime or pass a \ datetime object") # write the metadata for a file output = template.substitute(# determined by file ext, set within function wcs=wcs, wms=wms, tax=tax, ext=ext, mimetype=mimetype, model_set_type=model_set_type, # passed as args to parent function model_run_uuid=modelRunUUID, model_vars=model_vars, description=description, model_set=model_set, fgdcMetadata=fgdcMetadata, # derived from parent function args basename=basename, inputFilePath=inputFilePath, # given in config file parent_model_run_uuid=parentModelRunUUID, modelname=commonConfig['model'], state=watershedConfig['state'], model_set_taxonomy=commonConfig['model_set_taxonomy'], orig_epsg=watershedConfig['orig_epsg'], westBnd=commonConfig['westBnd'], eastBnd=commonConfig['eastBnd'], northBnd=commonConfig['northBnd'], southBnd=commonConfig['southBnd'], start_datetime=start_datetime, end_datetime=end_datetime, epsg=watershedConfig['epsg'], location=watershedConfig['location'], # static default values defined at top of func recs=RECS, features=FEATURES ) return output
[docs]class VWClient: """ Client class for interacting with a Virtual Watershed (VW). A VW is essentially a structured database with certain rules for its metadata and for uploading or inserting data. """ def __init__(self, ip_address, uname, passwd): """ Initialize a new connection to the virtual watershed """ # Check our credentials authUrl = "https://" + ip_address + "/apps/my_app/auth" r = requests.get(authUrl, auth=(uname, passwd), verify=False) r.raise_for_status() self.uname = uname self.passwd = passwd # Initialize URLS used by class methods self.insertDatasetUrl = "https://" + ip_address + \ "/apps/my_app/datasets" self.dataUploadUrl = "https://" + ip_address + "/apps/my_app/data" self.uuidCheckUrl = "https://" + ip_address + \ "/apps/my_app/checkmodeluuid" self.searchUrl = "https://" + ip_address + \ "/apps/my_app/search/datasets.json?version=3" self.new_run_url = "https://" + ip_address + \ "/apps/my_app/newmodelrun" # number of times to re-try an http request self._retry_num = 3
[docs] def initialize_model_run(self, description): """ Iniitalize a new model run. Returns: (str) a newly-intialized model_run_uuid """ assert description, \ "You must provide a description for your new model run" data = {'description': description} auth = (self.uname, self.passwd) result = requests.post(self.new_run_url, data=json.dumps(data), auth=auth, verify=False) result.raise_for_status() return result.text
[docs] def search(self, **kwargs): """ Search the VW for JSON metadata records with matching parameters. Use key, value pairs as specified in the `Virtual Watershed Documentation <http://129.24.196.43//docs/stable/search.html#search-objects>`_ Returns: a list of JSON records as dictionaries """ fullUrl = self.searchUrl for key, val in kwargs.iteritems(): if type(val) is not str: val = str(val) fullUrl += "&%s=%s" % (key, val) r = requests.get(fullUrl, verify=False) return QueryResult(r.json())
[docs] def download(self, url, outFile): """ Download a file from the VW using url to localFile on local disk Returns: None """ data = urllib.urlopen(url) assert data.getcode() == 200, "Download Failed!" with file(outFile, 'w+') as out: out.writelines(data.readlines()) return None
[docs] def insert_metadata(self, watershedMetadata): """ Insert metadata to the virtual watershed. The data that gets uploaded is the FGDC XML metadata. Returns: None """ # logging.debug("insertDatasetUrl:\n" + self.insertDatasetUrl) # logging.debug("post data dumped:\n" + json.dumps(watershedMetadata)) num_tries = 0 while num_tries < self._retry_num: try: result = requests.put(self.insertDatasetUrl, data=watershedMetadata, auth=(self.uname, self.passwd), verify=False) logging.debug(result.content) result.raise_for_status() return result except requests.HTTPError: num_tries += 1 continue raise requests.HTTPError()
[docs] def upload(self, modelRunUUID, dataFilePath): """ Upload data for a given modelRunUUID to the VW """ # currently 'name' is unused dataPayload = {'name': os.path.basename(dataFilePath), 'modelid': modelRunUUID} num_tries = 0 while num_tries < self._retry_num: try: result = \ requests.post(self.dataUploadUrl, data=dataPayload, files={'file': open(dataFilePath, 'rb')}, auth=(self.uname, self.passwd), verify=False) # logging.debug(result.content) result.raise_for_status() return result except requests.HTTPError: num_tries += 1 continue raise requests.HTTPError()
class QueryResult: """ A request for records using the url built by ``VWClient.search`` and ``VWClient.fetch_records`` returns a JSON string with three base fields: ``total``, ``subtotal``, and ``results``. This structure wraps that functionality and is returned by the aforementioned VWClient functions. """ def __init__(self, json): self.json = json @property def total(self): """ Return the total records `known by the virtual watershed` that matched the parameters passed to either the fetch_records or search function. """ return self.json['total'] @property def subtotal(self): """ Return the `subtotal`, or the actual number of records that have been transferred by the virtual watershed. """ return self.json['subtotal'] @property def records(self): """ Return the records themselves returned by the Virtual Watershed in response to the query built by either ``search`` or ``fetch_records``. """ return self.json['results']
[docs]def default_vw_client(config_file="default.conf"): """ Use the credentials in config_file to initialize a new VWClient instance Returns: VWClient connected to the ip address given in config_file """ config = get_config(config_file) common = config['Common'] return VWClient(common['watershedIP'], common['user'], common['passwd'])
[docs]def get_config(config_file=None): """ Provide user with a ConfigParser that has read the `config_file` Returns: ConfigParser() """ if config_file is None: config_file = \ os.path.join(os.path.dirname(__file__), '../default.conf') assert os.path.isfile(config_file), "Config file %s does not exist!" \ % os.path.abspath(config_file) config = configparser.ConfigParser() config.read(config_file) return config