Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Feature/extracting sao information (#54)
Browse files Browse the repository at this point in the history
* Deriving the parser post-processing steps to capture SAO number and suffix information.

* Small improvements - allowed additional chars in the identification of flat range and building number, and changed the criteria.

* Added a new clause and improved those with a single component not to include ranges.

* Fixed the case where trying to find “65A” and also covering accidentally both from “55A-65A”. Using negative lookahead and behind to rule out the latter options.

* Simplified the parsing and pushed the modifications to the post-processing that operates on data frames rather than inside the loop.

* Created a new class for the address parser that holds all the pre- and post-processing steps.

* A new class that can be used to do normalisation, probabilistic parsing, and post-processing.

* AddressParser class is now complete and linking tests pass.

* Simple script to use the sao suffix data as a test set.

* Modified the local hybrid index creation so that it uses out-of-core computation (dask).

* A few modifications to the address parser to reflect was done inside the address linker.

* Updated he address linker to use the address parser class.

* Moved the creation of the final hybrid index from address linking to the data file. Added a function to create a test index for testing the linking code.

* a quick fix as the test_index_uprns is a numpy array rather than pandas data frame.

* Documentation changes to the address linking.

* Small bug fix to the parsing post-processing logic. Changed the address linking UPRN type to float64 to support comparisons and missing values.

* Some improvements to the matching logic.

* Added an extraction step to the parser post-processing which identifies numbers from street names. This happens for messy inputs where the street hasn’t been entered correctly.

* Added a new blocking rule to the matching engine.
  • Loading branch information
saniemi authored Mar 7, 2017
1 parent fc81285 commit 4885980
Show file tree
Hide file tree
Showing 5 changed files with 998 additions and 586 deletions.
236 changes: 193 additions & 43 deletions DataScience/Analytics/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
A simple script containing methods to query or modify the ONS AddressBase data.
Uses Dask for larger-than-memory computations. Can also use distributed to spread
the computations over a cluster if needed.
Requirements
------------
:requires: numpy
:requires: pandas
:requires: dask (tested with 0.14.0)
:requires: distributed (tested with 1.16.0)
:requires: numpy (tested with 1.12.0)
:requires: pandas (tested with 0.19.2)
:requires: sqlalchemy
:requires: tqdm (https://github.com/tqdm/tqdm)
Expand All @@ -24,15 +29,19 @@
Version
-------
:version: 0.9
:date: 23-Jan-2016
:version: 1.0
:date: 2-Mar-2016
"""
import glob
import os
import re
import sqlite3

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.diagnostics import ProgressBar
from distributed import Client, LocalCluster
from sqlalchemy import create_engine
from tqdm import tqdm

Expand Down Expand Up @@ -74,8 +83,9 @@ def getPostcode(string):
"""
try:
tmp = \
re.findall(r'[A-PR-UWYZ0-9][A-HK-Y0-9][AEHMNPRTVXY0-9]?[ABEHMNPRVWXY0-9]? {1,2}[0-9][ABD-HJLN-UW-Z]{2}|GIR 0AA',
string)[0]
re.findall(
r'[A-PR-UWYZ0-9][A-HK-Y0-9][AEHMNPRTVXY0-9]?[ABEHMNPRVWXY0-9]? {1,2}[0-9][ABD-HJLN-UW-Z]{2}|GIR 0AA',
string)[0]
except ValueError:
tmp = None

Expand Down Expand Up @@ -190,65 +200,81 @@ def combineMiniABtestingData():
data.to_csv(path + 'ABmini.csv', index=0)


def combineAddressBaseData(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/',
filename='AB.csv'):
def combine_address_base_data(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/',
filename='AB.csv', distributed=False):
"""
Read in all the Address Base Epoch 39 CSV files and combine to a single CSV file.
Only relevant information is retained to compress the AB for easier handling.
.. Note:: Uses Dask so that the datasets do not need to fit in memory. This is not very efficient
as the join is using a column not index. However, as UPRN is not unique using it as index
has a penalty too.
:param path: location of the AddressBase CSV files
:type path: str
:param filename: name of the output file
:type filename: str
:param distributed:
:type distributed: bool
:return: None
"""
files = glob.glob(path + 'ABP_E39_*.csv')
if distributed:
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
print(client)

for file in tqdm(files):

# skip a few addresses not used
if 'CLASSIFICATION' in file or 'STREET.csv' in file:
pass

print('Reading in', file)
tmp = pd.read_csv(file, dtype=str)
all_files = glob.glob(path + 'ABP_E39_*.csv')
files = [file for file in all_files if ('STREET.csv' not in file)]

data_container = dict()
for file in files:
if 'BLPU' in file:
BLPU = tmp[['UPRN', 'POSTCODE_LOCATOR']]
columns = ['UPRN', 'POSTCODE_LOCATOR']
id = 'BLPU'

if 'DELIVERY_POINT' in file:
DP = tmp[['UPRN', 'ORGANISATION_NAME', 'DEPARTMENT_NAME', 'SUB_BUILDING_NAME',
'BUILDING_NAME', 'BUILDING_NUMBER', 'THROUGHFARE', 'DEPENDENT_LOCALITY',
'POST_TOWN', 'POSTCODE']]
columns = ['UPRN', 'ORGANISATION_NAME', 'DEPARTMENT_NAME', 'SUB_BUILDING_NAME',
'BUILDING_NAME', 'BUILDING_NUMBER', 'THROUGHFARE', 'DEPENDENT_LOCALITY',
'POST_TOWN', 'POSTCODE']
id = 'DP'

if 'LPI' in file:
LPI = tmp[['UPRN', 'USRN', 'LANGUAGE',
'PAO_TEXT', 'PAO_START_NUMBER', 'PAO_START_SUFFIX', 'PAO_END_NUMBER', 'PAO_END_SUFFIX',
'SAO_TEXT', 'SAO_START_NUMBER', 'SAO_START_SUFFIX']]
columns = ['UPRN', 'USRN', 'LANGUAGE', 'PAO_TEXT', 'PAO_START_NUMBER', 'PAO_START_SUFFIX',
'PAO_END_NUMBER', 'PAO_END_SUFFIX', 'SAO_TEXT', 'SAO_START_NUMBER', 'SAO_START_SUFFIX',
'SAO_END_NUMBER', 'SAO_END_SUFFIX']
id = 'LPI'

if 'STREET_DESC' in file:
ST = tmp[['USRN', 'STREET_DESCRIPTOR', 'TOWN_NAME', 'LANGUAGE', 'LOCALITY']]
columns = ['USRN', 'STREET_DESCRIPTOR', 'TOWN_NAME', 'LANGUAGE', 'LOCALITY']
id = 'ST'

if 'ORGANISATION' in file:
ORG = tmp[['UPRN', 'ORGANISATION']]
columns = ['UPRN', 'ORGANISATION']
id = 'ORG'

print('joining the individual dataframes...')
data = pd.merge(BLPU, DP, how='left', on='UPRN')
data = pd.merge(data, LPI, how='left', on='UPRN')
data = pd.merge(data, ORG, how='left', on=['UPRN'])
data = pd.merge(data, ST, how='left', on=['USRN', 'LANGUAGE'])
print('Reading in', file)
data_container[id] = dd.read_csv(file, dtype=str, usecols=columns)

# drop if all null
data.dropna(inplace=True, how='all')
print('joining the individual data frames to form a single hybrid index...')
data = dd.merge(data_container['BLPU'], data_container['DP'], how='left', on='UPRN')
data = dd.merge(data, data_container['LPI'], how='left', on='UPRN')
data = dd.merge(data, data_container['ORG'], how='left', on=['UPRN'])
data = dd.merge(data, data_container['ST'], how='left', on=['USRN', 'LANGUAGE'])

# change uprn to int
if distributed:
data = dd.compute(data)[0]
else:
with ProgressBar():
data = dd.compute(data)[0]

print('change the uprn type to int...')
data['UPRN'] = data['UPRN'].astype(int)

# drop if no UPRN
print('drop all entries with no UPRN...')
data = data[np.isfinite(data['UPRN'].values)]

# drop some that are not needed
print('drop unnecessary columns...')
data.drop(['LANGUAGE', 'USRN'], axis=1, inplace=True)

print(data.info())
Expand Down Expand Up @@ -339,7 +365,7 @@ def create_NLP_index(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRE

columns = {'BLPU': ['UPRN', 'POSTCODE_LOCATOR'],
'LPI': ['UPRN', 'USRN', 'LANGUAGE', 'PAO_TEXT', 'PAO_START_NUMBER', 'PAO_START_SUFFIX', 'PAO_END_NUMBER',
'PAO_END_SUFFIX','SAO_TEXT', 'SAO_START_NUMBER', 'SAO_START_SUFFIX', 'OFFICIAL_FLAG'],
'PAO_END_SUFFIX', 'SAO_TEXT', 'SAO_START_NUMBER', 'SAO_START_SUFFIX', 'OFFICIAL_FLAG'],
'STREET_DESC': ['USRN', 'STREET_DESCRIPTOR', 'TOWN_NAME', 'LANGUAGE', 'LOCALITY'],
'ORGANISATION': ['UPRN', 'ORGANISATION']}

Expand Down Expand Up @@ -382,16 +408,17 @@ def create_NLP_index(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRE
data.to_csv(path + filename, index=False)


def create_random_sample_of_delivery_point_addresses(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/',
size=100000):
def create_random_sample_of_delivery_point_addresses(
path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/',
size=100000):
"""
:param path:
:param size:
:return:
"""
# read in delivery point table
delivery_point = pd.read_csv(path + 'ABP_E39_DELIVERY_POINT.csv', dtype=str)
delivery_point = pd.read_csv(path + 'ABP_E39_DELIVERY_POINT.csv', dtype=str)
delivery_point['UPRN'] = delivery_point['UPRN'].astype(np.int64)
print(len(delivery_point.index), 'delivery point addresses...')

Expand All @@ -411,14 +438,137 @@ def create_random_sample_of_delivery_point_addresses(path='/Users/saminiemi/Proj
# write to a file - UPRN and a single string address from the delivery point
data = data.fillna('')
data['ADDRESS'] = data["ORGANISATION_NAME"] + ' ' + data["DEPARTMENT_NAME"] + ' ' + data["SUB_BUILDING_NAME"] + ' '\
+ data["BUILDING_NAME"] + ' ' + data["BUILDING_NUMBER"] + ' ' + data["THROUGHFARE"] + ' ' +\
+ data["BUILDING_NAME"] + ' ' + data["BUILDING_NUMBER"] + ' ' + data["THROUGHFARE"] + ' ' + \
data["POST_TOWN"] + ' ' + data["POSTCODE"]

data = data[['UPRN', 'ADDRESS']]
print('Storing single string delivery point addresses to a file...')
data.to_csv(path + 'delivery_point_addresses.csv', index=False)


def create_final_hybrid_index(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/', filename='AB.csv',
output_filename='AB_processed.csv'):
"""
A function to load an initial version of hybrid index as produced by combine_address_base_data
and to process it to the final hybrid index used in matching.
.. Warning: this method modifies the original AB information by e.g. combining different tables. Such
activities are undertaken because of the aggressive blocking the prototype linking code uses.
The actual production system should take AB as it is and the linking should not perform blocking
but rather be flexible and take into account that in NAG the information can be stored in various
fields.
"""
address_base = pd.read_csv(path + filename,
dtype={'UPRN': np.int64, 'POSTCODE_LOCATOR': str, 'ORGANISATION_NAME': str,
'DEPARTMENT_NAME': str, 'SUB_BUILDING_NAME': str, 'BUILDING_NAME': str,
'BUILDING_NUMBER': str, 'THROUGHFARE': str, 'DEPENDENT_LOCALITY': str,
'POST_TOWN': str, 'POSTCODE': str, 'PAO_TEXT': str,
'PAO_START_NUMBER': str, 'PAO_START_SUFFIX': str, 'PAO_END_NUMBER': str,
'PAO_END_SUFFIX': str, 'SAO_TEXT': str, 'SAO_START_NUMBER': np.float64,
'SAO_START_SUFFIX': str, 'ORGANISATION': str, 'STREET_DESCRIPTOR': str,
'TOWN_NAME': str, 'LOCALITY': str, 'SAO_END_NUMBER': np.float64,
'SAO_END_SUFFIX': str})
print('Found {} addresses from the combined AddressBase file...'.format(len(address_base.index)))

# remove street records from the list of potential matches - this makes the search space slightly smaller
exclude = 'STREET RECORD|ELECTRICITY SUB STATION|PUMPING STATION|POND \d+M FROM|PUBLIC TELEPHONE|'
exclude += 'PART OF OS PARCEL|DEMOLISHED BUILDING|CCTV CAMERA|TANK \d+M FROM|SHELTER \d+M FROM|TENNIS COURTS|'
exclude += 'PONDS \d+M FROM|SUB STATION'
msk = address_base['PAO_TEXT'].str.contains(exclude, na=False, case=False)
address_base = address_base.loc[~msk]

# combine information - could be done differently, but for now using some of these for blocking
msk = address_base['THROUGHFARE'].isnull()
address_base.loc[msk, 'THROUGHFARE'] = address_base.loc[msk, 'STREET_DESCRIPTOR']

msk = address_base['BUILDING_NUMBER'].isnull()
address_base.loc[msk, 'BUILDING_NUMBER'] = address_base.loc[msk, 'PAO_START_NUMBER']

msk = address_base['BUILDING_NAME'].isnull()
address_base.loc[msk, 'BUILDING_NAME'] = address_base.loc[msk, 'PAO_TEXT']

msk = address_base['ORGANISATION_NAME'].isnull()
address_base.loc[msk, 'ORGANISATION_NAME'] = address_base.loc[msk, 'ORGANISATION']

msk = address_base['POSTCODE'].isnull()
address_base.loc[msk, 'POSTCODE'] = address_base.loc[msk, 'POSTCODE_LOCATOR']

msk = address_base['SUB_BUILDING_NAME'].isnull()
address_base.loc[msk, 'SUB_BUILDING_NAME'] = address_base.loc[msk, 'SAO_TEXT']

msk = address_base['POST_TOWN'].isnull()
address_base.loc[msk, 'POST_TOWN'] = address_base.loc[msk, 'TOWN_NAME']

msk = address_base['POSTCODE'].isnull()
address_base.loc[msk, 'POSTCODE'] = address_base.loc[msk, 'POSTCODE_LOCATOR']

msk = address_base['LOCALITY'].isnull()
address_base.loc[msk, 'LOCALITY'] = address_base.loc[msk, 'DEPENDENT_LOCALITY']

# sometimes addressbase does not have SAO_START_NUMBER even if SAO_TEXT clearly has a number
# take the digits from SAO_TEXT and place them to SAO_START_NUMBER if this is empty
msk = address_base['SAO_START_NUMBER'].isnull() & (address_base['SAO_TEXT'].notnull())
address_base.loc[msk, 'SAO_START_NUMBER'] = pd.to_numeric(
address_base.loc[msk, 'SAO_TEXT'].str.extract('(\d+)'))

# normalise street names so that st. is always st and 's is always s
msk = address_base['THROUGHFARE'].str.contains('ST\.\s', na=False, case=False)
address_base.loc[msk, 'THROUGHFARE'] = address_base.loc[msk, 'THROUGHFARE'].str.replace('ST\. ', 'ST ')
msk = address_base['THROUGHFARE'].str.contains("'S\s", na=False, case=False)
address_base.loc[msk, 'THROUGHFARE'] = address_base.loc[msk, 'THROUGHFARE'].str.replace("'S\s", 'S ')

# drop some that are not needed - in the future versions these might be useful
address_base.drop(['DEPENDENT_LOCALITY', 'POSTCODE_LOCATOR', 'ORGANISATION'], axis=1, inplace=True)

# split postcode to in and outcode - useful for doing blocking in different ways
postcodes = address_base['POSTCODE'].str.split(' ', expand=True)
postcodes.rename(columns={0: 'postcode_in', 1: 'postcode_out'}, inplace=True)
address_base = pd.concat([address_base, postcodes], axis=1)

print('Using {} addresses from the final hybrid index...'.format(len(address_base.index)))

print(address_base.info(verbose=True, memory_usage=True, null_counts=True))
address_base.to_csv(path + output_filename, index=False)


def create_test_hybrid_index(path='/Users/saminiemi/Projects/ONS/AddressIndex/data/ADDRESSBASE/',
filename='AB_processed.csv', output_filename='ABtest.csv'):
"""
Updates an existing test index to reflect changes made to the processed final hybrid index.
:param path:
:param filename:
:param output_filename:
:return: None
"""
address_base = pd.read_csv(path + filename,
dtype={'UPRN': np.int64, 'POSTCODE_LOCATOR': str, 'ORGANISATION_NAME': str,
'DEPARTMENT_NAME': str, 'SUB_BUILDING_NAME': str, 'BUILDING_NAME': str,
'BUILDING_NUMBER': str, 'THROUGHFARE': str, 'DEPENDENT_LOCALITY': str,
'POST_TOWN': str, 'POSTCODE': str, 'PAO_TEXT': str,
'PAO_START_NUMBER': str, 'PAO_START_SUFFIX': str, 'PAO_END_NUMBER': str,
'PAO_END_SUFFIX': str, 'SAO_TEXT': str, 'SAO_START_NUMBER': np.float64,
'SAO_START_SUFFIX': str, 'ORGANISATION': str, 'STREET_DESCRIPTOR': str,
'TOWN_NAME': str, 'LOCALITY': str, 'SAO_END_NUMBER': np.float64,
'SAO_END_SUFFIX': str})
print('Found {} addresses from te hybrid index...'.format(len(address_base.index)))

test_index_uprns = pd.read_csv(path + output_filename, usecols=['UPRN'], dtype={'UPRN': np.int64})['UPRN'].values
print('Found {} addresses from the test index...'.format(len(test_index_uprns)))

# find the overlap
mask = np.in1d(address_base['UPRN'].values, test_index_uprns)

# output to a file - overwrites the old test index
address_base_test_index = address_base.loc[mask]
address_base_test_index.to_csv(path + output_filename, index=False)
print(address_base_test_index.info())


if __name__ == "__main__":
create_NLP_index()
create_random_sample_of_delivery_point_addresses()
combine_address_base_data()
create_final_hybrid_index()
convertCSVtoSQLite()

create_test_hybrid_index()
Loading

0 comments on commit 4885980

Please sign in to comment.