Predicting Election Night Margin 1

Posted on Thu 12 January 2023 in Python, Data Science

As tallies are reported on election night, there’s a tendency for early results to favor one political party over the other. At a county level, this is usually due to the way ballots are counted. Below is usually the order in which ballots are counted

  1. Mail-in
  2. early in-person
  3. Election Day ballots are counted

And the early methods tend to favor Democrat, who are much more likely to vote absentee and early, as opposed to Republicans, who tend to vote on election day more so then early in-person and much more likely than the mail-in method. The goal of this exercise is to build a model that adjust the early tallies to smooth out this voting method disparity. Our ideal model would adjust early vote margin so they match the final margin for any given county, and any deviation an early vote margin has from the final margin is considered error.

Data Acquisition

In [3]:
### Import required libraries.
import geopandas
import descartes
from descartes import PolygonPatch
import json
import math
import pandas as pd
import os
import requests
import numpy as np
import matplotlib.pyplot as plt
import boto3
from datetime import datetime, timezone
import pytz
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from matplotlib.animation import FuncAnimation
from matplotlib.colors import Normalize
from matplotlib import cm 
from IPython.display import HTML,display,Javascript
from collections import defaultdict
import warnings
display(HTML("<style>.container { width:100% !important; }</style>"))
warnings.filterwarnings('ignore')


from urllib.request import urlopen, Request

%matplotlib inline

Loading Dataset

Download required data sets

  • fips2county data. This data will be used cross reference fips codes with county names
  • census state cartographic boundary files. This data will be used to draw state boarders for state map using geopandas
  • census county cartographic boundary files. This data will be used to draw county boarders for state map using geopandas
In [4]:
def download(url: str, dest_folder: str):
    filename = url.split('/')[-1].replace(" ", "_")
    file_path = os.path.join(dest_folder, filename)
    
    if not os.path.exists(file_path):
        os.makedirs(dest_folder, exist_ok=True)
        r = requests.get(url, stream=True)
        if r.ok:
            print("saving to", os.path.abspath(file_path))
            with open(file_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024 * 8):
                    if chunk:
                        f.write(chunk)
                        f.flush()
                        os.fsync(f.fileno())
        else:  
            print("Download failed: status code ",r.status_code)
            print(r.text)
        
        
# Download fips data, and load it into data frame
download("https://raw.githubusercontent.com/ChuckConnell/articles/master/fips2county.tsv", dest_folder="/data")
fips_df = pd.read_csv('/data/fips2county.tsv', sep='\t', header='infer', dtype=str, encoding='latin-1')
fips_df=fips_df.set_index('CountyFIPS')
#geo county data
geo_county_df = geopandas.read_file("https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_5m.zip")
geo_county_df['fipsCode']= geo_county_df['STATEFP']+geo_county_df['COUNTYFP']
#geo state data
geo_state_df = geopandas.read_file("http://www2.census.gov/geo/tiger/GENZ2016/shp/cb_2016_us_state_5m.zip")

Parse Election

load AP election capture data from the Georgia Senate Election. The results ia dictonary indexed by capture time of election results throught eleciton night.

  • General eleciton capture will be used to train the model
  • runnoff election will be used for test data.
In [5]:
def build_ap_eleciton_data(eleciton_data):
    county_fipps=set(eleciton_data.keys())-set(['metadata','summary'])
    election_data_counties=[eleciton_data[code] for code in county_fipps ]
    df=pd.json_normalize(election_data_counties,'candidates',['fipsCode',
                                                              'precinctsReporting',
                                                              'precinctsTotal',
                                                              'eevp',
                                                              ['parameters','vote','expected','actual'],
                                                              ['parameters','vote','total'],
                                                              ['parameters','vote','registered']])

    
    df['candidateID']=df['candidateID'].apply(lambda x: eleciton_data['metadata']['candidates'][x]['first']+\
                                                        eleciton_data['metadata']['candidates'][x]['last'])
    
    df = df.set_index(['fipsCode','candidateID'])
    return df


def ap_election_from_file(election_dir):
    files = os.listdir(election_dir)
    files = sorted([(f.split('_')[0],f) for f in files if os.path.isfile(election_dir+'/'+f)]) 
    #dictionary indexed by capture time, holds dataframe of county results 
    election_timeseries={}
    #loop through each election capture 
    for epoch, file in files:
        with open(election_dir+file) as f:
            eleciton_data = json.loads(f.read())
        election_timeseries[epoch]=build_ap_eleciton_data(eleciton_data)
    return election_timeseries

def ap_election_from_S3(bucket_name, prefix ):
    s3 = boto3.client('s3')
    resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    files = [ obj['Key']   for obj in resp['Contents']]
    files = sorted(map(lambda file: (file.split('/')[-1].split('_')[0],file), files))
    election_timeseries={}
    for epoch, file in files:
        data=s3.get_object(Bucket=bucket_name, Key=file )
        contents = data['Body'].read()
        eleciton_data = json.loads(contents.decode("utf-8"))
        election_timeseries[epoch]=build_ap_eleciton_data(eleciton_data)
    return election_timeseries


runoff_timeseries = ap_election_from_S3(bucket_name = "ap-scraper",prefix = '2022/GA/ussenate/runoff/')
#election_timeseries = ap_election_from_S3(bucket_name = "ap-scraper",prefix = '2022/GA/ussenate/general/')
election_timeseries=ap_election_from_file('/data/2022/GA/ussenate/general/')

Exploratory Analysis

Display Eleciton Data

display sample AP election capture

In [8]:
epochs=list(election_timeseries.keys())
df=election_timeseries[epochs[1]]
df.reset_index()
Out[8]:
fipsCode candidateID voteCount votePct precinctsReporting precinctsTotal eevp parameters.vote.expected.actual parameters.vote.total parameters.vote.registered
0 13321 RaphaelWarnock 0 0 0 12 0 9000 0 13319
1 13321 HerschelWalker 0 0 0 12 0 9000 0 13319
2 13321 ChaseOliver 0 0 0 12 0 9000 0 13319
3 13115 RaphaelWarnock 0 0 0 25 0 38476 0 59885
4 13115 HerschelWalker 0 0 0 25 0 38476 0 59885
... ... ... ... ... ... ... ... ... ... ...
472 13005 HerschelWalker 0 0 0 1 0 4479 0 6526
473 13005 ChaseOliver 0 0 0 1 0 4479 0 6526
474 13019 RaphaelWarnock 0 0 0 5 0 7264 0 11170
475 13019 HerschelWalker 0 0 0 5 0 7264 0 11170
476 13019 ChaseOliver 0 0 0 5 0 7264 0 11170

477 rows × 10 columns

Display map of county vote change

Generate a time laps map of change in Georgia county results for Raphael Warnock for General election captures as the vote comes in throughout election night

In [9]:
EST = pytz.timezone('US/Eastern')


last_election_capture=election_timeseries[epochs[-1]]
master=pd.merge(geo_county_df,last_election_capture.reset_index().query("candidateID == 'RaphaelWarnock'"), on="fipsCode")

def plot_frame(epoch):
    colors = 'RdBu_r'
    #Create the colormap using the min/max values
    vmin = 0
    vmax = 100
    cmap = cm.ScalarMappable(Normalize(vmin, vmax), colors)
    fig, [ax,cax] = plt.subplots(1, 2, figsize = (10,10), gridspec_kw={"width_ratios":[50,1]})
    #Create the colorbar with colormap
    plt.colorbar(mappable = cmap, cax = cax)
    cax.set_title('Raphael Warnock Vote Pct')

    ax.set_title('Election Night Vote Returns By Time: {}'.format(datetime.fromtimestamp( int(epoch) ).astimezone(EST).isoformat()  ))
    master=pd.merge(geo_county_df,election_timeseries[epoch].reset_index().query("candidateID == 'RaphaelWarnock'"), on="fipsCode")
    geo_state_df.query("NAME == 'Georgia'").plot(ax=ax, edgecolor="black", color="white")
    master.plot(ax=ax, column='votePct',vmin = vmin, vmax=vmax,cmap = colors)
    plt.close()
    return ax, fig
# queue up the captures to where results start coming in
start=0
for idx,epoch in enumerate(epochs):
    if election_timeseries[epoch].reset_index().query("candidateID == 'RaphaelWarnock'")['votePct'].sum()>0:
        start=idx
        break
for idx,epoch in enumerate(epochs[(start-1):]):
     ax, fig = plot_frame(epoch) 
     fig.savefig('../img/img{:03d}.png'.format(idx), dpi=100, format='png', bbox_inches='tight')
In [10]:
%%bash
cd ../img;
# Create a gif viz
ffmpeg -y -framerate 12  -i  img%03d.png -f gif out.gif> /dev/null 2>&1

map_of_county_vote_change

Linear Regression Plot

fist, we'll try to fit a linear model to the vote precentage retuned for each county throughtout election night, then we'll see how good of a fit a general linear model represents the data by examining the residuals of our linear model fit

In [85]:
ncol = 4
sorted_vote_count_df=election_timeseries[epochs[-1]].sort_values('parameters.vote.expected.actual', ascending=False)
sorted_vote_count_df=sorted_vote_count_df.xs('RaphaelWarnock', level='candidateID', drop_level=False).reset_index() 
nrow = math.ceil(sorted_vote_count_df['fipsCode'].head(20).shape[0]/ncol)
i = 1
f, axs = plt.subplots( ncols=ncol, nrows=nrow,figsize=( 4.0 * ncol,4.0 * nrow))
axs = axs.flatten()
for ax,fips in zip(axs, sorted_vote_count_df['fipsCode'].head(20)):
    df=pd.concat([election_timeseries[epoch].loc[fips,'RaphaelWarnock'] for epoch in epochs],  axis=1).T
    df=df.reset_index(drop=True)
    df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
    edf=df[0<df['votePct'] ].drop_duplicates()
    order=1
    ax.set_title(fips_df.loc[fips]['CountyName']+' County Vote % Raphael Warnock')
    sns.regplot(x='ReportingPct', y='votePct', data=edf.astype(float), order=order, ci=None,  ax=ax)
    i += 1
plt.tight_layout() 
plt.show()
In [86]:
ncol = 4
sorted_vote_count_df=election_timeseries[epochs[-1]].sort_values('parameters.vote.expected.actual', ascending=False)
sorted_vote_count_df=sorted_vote_count_df.xs('RaphaelWarnock', level='candidateID', drop_level=False).reset_index() 
nrow = math.ceil(sorted_vote_count_df['fipsCode'].head(20).shape[0]/ncol)
i = 1
f, axs = plt.subplots( ncols=ncol, nrows=nrow,figsize=( 4.0 * ncol,4.0 * nrow))
axs = axs.flatten()
for ax,fips in zip(axs, sorted_vote_count_df['fipsCode'].head(20)):
    df=pd.concat([election_timeseries[epoch].loc[fips,'RaphaelWarnock'] for epoch in epochs],  axis=1).T
    df=df.reset_index(drop=True)
    df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
    edf=df[0<df['votePct'] ].drop_duplicates()
    order=1
    ax.set_title(fips_df.loc[fips]['CountyName']+' County Vote % Raphael Warnock')

    #sns.regplot(x='ReportingPct', y='votePct', data=edf.astype(float), order=order, ci=None, scatter_kws={"s": 80}, ax=ax)
    sns.residplot(x='ReportingPct', y='votePct', data=edf.astype(float), order=order,  ax=ax)
    ax.set(ylabel='residuals',xlabel='fitted values')

    i += 1
plt.tight_layout() 
plt.show()

Polynomial Regression Plot

Perhaps a a polynaomial model is a better fit for the data, so let's plot a polynomial regression fit to data as well.

In [87]:
ncol = 4
nrow = math.ceil(sorted_vote_count_df['fipsCode'].head(20).shape[0]/ncol)

i = 1
f, axs = plt.subplots( ncols=ncol, nrows=nrow,figsize=( 4.0 * ncol,4.0 * nrow))
axs = axs.flatten()
for ax,fips in zip(axs, sorted_vote_count_df['fipsCode'].head(20)):
    df=pd.concat([election_timeseries[epoch].loc[fips,'RaphaelWarnock'] for epoch in epochs],  axis=1).T
    df=df.reset_index(drop=True)
    df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
    edf=df[0<df['votePct'] ].drop_duplicates()
    if edf.shape[0]>3:
        order=2
    else:
        order=1
    ax.set_title(fips_df.loc[fips]['CountyName']+' County Vote % Raphael Warnock')

    sns.regplot(x='ReportingPct', y='votePct', data=edf.astype(float), order=order, ci=None, scatter_kws={"s": 80}, ax=ax)
    i += 1
plt.tight_layout() 
plt.show()
In [88]:
ncol = 4
nrow = math.ceil(sorted_vote_count_df['fipsCode'].head(20).shape[0]/ncol)

i = 1
f, axs = plt.subplots( ncols=ncol, nrows=nrow,figsize=( 4.0 * ncol,4.0 * nrow))
axs = axs.flatten()
for ax,fips in zip(axs, sorted_vote_count_df['fipsCode'].head(20)):
    df=pd.concat([election_timeseries[epoch].loc[fips,'RaphaelWarnock'] for epoch in epochs],  axis=1).T
    df=df.reset_index(drop=True)
    df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
    edf=df[0<df['votePct'] ].drop_duplicates()
    if edf.shape[0]>3:
        order=2
    else:
        order=1

    ax.set_title(fips_df.loc[fips]['CountyName']+' County Vote % Raphael Warnock')

    sns.residplot(x='ReportingPct', y='votePct', data=edf.astype(float), order=order,  lowess=True, ax=ax)
    ax.set(ylabel='residuals',xlabel='fitted values')
    i += 1
plt.tight_layout() 
plt.show()

Fitting Model

Before we fit our model, let's get a baseline error without any adjustment. This would be the RMSD between the count of an election capture throughout election night and the final result in a county.

In [11]:
def RMSD(dfp,dfx):
    return(((dfp - dfx) ** 2).mean() ** .5)
runoff_epoch_keys=list(runoff_timeseries.keys())
last_election_capture_runoff=runoff_timeseries[runoff_epoch_keys[-1]]
true_result=last_election_capture_runoff.xs('RaphaelWarnock', level='candidateID')['voteCount'].sum()/last_election_capture_runoff['voteCount'].sum()

general_epochs=list(election_timeseries.keys())
last_election_capture=election_timeseries[general_epochs[-1]]

error=0
for epoch in runoff_epoch_keys:
    df=runoff_timeseries[epoch]
    runoff_indexes=df[df['eevp']>10].index
    if len(runoff_indexes):
        adjusted_eevp_df= df.loc[runoff_indexes]['votePct']
        vote_count=df.loc[runoff_indexes]['parameters.vote.expected.actual']*adjusted_eevp_df/100
        error+= RMSD(vote_count.xs('RaphaelWarnock', level='candidateID'),last_election_capture_runoff.loc[runoff_indexes].xs('RaphaelWarnock', level='candidateID')['voteCount']) 
print(f'Cumulative baseline error of Raphael Warnock\'s margins in the runoff election {error=}')
Cumulative baseline error of Raphael Warnock's margins in the runoff election error=939503.0890928901

1. Linear Regression

We use the 2022 georgia general election to build our linear regression model

In [12]:
class Election_Model():
    def __init__(self,model,*model_list_args,**model_dict_args):
        def default_value():
            return model(*model_list_args,**model_dict_args)
        def default_index():
            return defaultdict(default_value)
         
        self.election_model_dict= defaultdict(default_index)

    def __getitem__(self, index):
        return self.election_model_dict[index]

        
    

linear_model = Election_Model(LinearRegression);


for fips in last_election_capture.reset_index()['fipsCode'].unique():
    for candidate_id in last_election_capture.loc[fips].index:

        df=pd.concat([election_timeseries[epoch].loc[fips,candidate_id] for epoch in general_epochs],  axis=1).T
        df=df.reset_index(drop=True)
        df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
        edf=df[0<df['votePct'] ].drop_duplicates()

        x = np.array([edf['ReportingPct'].array.astype(float)]).T
        y = np.array([edf['votePct'].array.astype(float)]).T

        linear_model[fips][candidate_id].fit(x, y)

test our linear model with data from the 2022 georgia runoff eleciton

In [16]:
error=0
for epoch in runoff_epoch_keys:
    df=runoff_timeseries[epoch]
    runoff_indexes=df[df['eevp']>10].index
    if len(runoff_indexes):
        adjusted_eevp_df=np.array([ linear_model[fips][candidate_id].predict(np.array([[100]]))[0][0]- linear_model[fips][candidate_id].predict(np.array([[ df.loc[fips, candidate_id]['eevp']  ]]))[0][0] for fips, candidate_id in runoff_indexes])  + df.loc[runoff_indexes]['votePct']
        vote_count=df.loc[runoff_indexes]['parameters.vote.expected.actual']*adjusted_eevp_df/100
        error+= RMSD(vote_count.xs('RaphaelWarnock', level='candidateID'),last_election_capture_runoff.loc[runoff_indexes].xs('RaphaelWarnock', level='candidateID')['voteCount']) 
print(f'Cumulative simple regression error of Raphael Warnock\'s margins in the runoff election {error=}')
Cumulative simple regression error of Raphael Warnock's margins in the runoff election error=723365.5316531633

2. Polynomial Regression

We use the 2022 georgia general election to build our polynomial regression model

In [14]:
poly_model = Election_Model(LinearRegression);
poly = PolynomialFeatures(2)



for fips in last_election_capture.reset_index()['fipsCode'].unique():
    for candidate_id in last_election_capture.loc[fips].index:

        
        
        df=pd.concat([election_timeseries[epoch].loc[fips,candidate_id] for epoch in general_epochs],  axis=1).T
        df=df.reset_index(drop=True)
        df['ReportingPct']=(df['parameters.vote.total']/df['parameters.vote.expected.actual'])*100
        edf=df[0<df['votePct'] ].drop_duplicates()
 
        x = np.array([edf['ReportingPct'].array.astype(float)]).T
        x = poly.fit_transform(x)
        y = np.array([edf['votePct'].array.astype(float)]).T

        poly_model[fips][candidate_id].fit(x, y)

test our polynomial model with data from the 2022 georgia runoff eleciton

In [17]:
error=0
poly = PolynomialFeatures(2)

for epoch in runoff_epoch_keys:
    df=runoff_timeseries[epoch]
    runoff_indexes=df[df['eevp']>10].index
    if len(runoff_indexes):
        adjusted_eevp_df=np.array([ poly_model[fips][candidate_id].predict(  poly.fit_transform( np.array([[100]]) ))[0][0]- poly_model[fips][candidate_id].predict(  poly.fit_transform( np.array([[ df.loc[fips, candidate_id]['eevp']  ]]) )   )[0][0] for fips, candidate_id in runoff_indexes])  + df.loc[runoff_indexes]['votePct']
        vote_count=df.loc[runoff_indexes]['parameters.vote.expected.actual']*adjusted_eevp_df/100
        error+= RMSD(vote_count.xs('RaphaelWarnock', level='candidateID'),last_election_capture_runoff.loc[runoff_indexes].xs('RaphaelWarnock', level='candidateID')['voteCount']) 
 
print(f'Cumulative polynomial regression  error of Raphael Warnock\'s margins in the runoff election {error=}')
Cumulative polynomial regression  error of Raphael Warnock's margins in the runoff election error=732958.2334045434

Conclusion

In this exercise, our goal was to build an election night margin prediction model. As results come in throughout election night, early results are usually skewed based on voting patterns. Early in-person and mail-in votes are usually counted first, election day votes are counted after, and as the data shows, most counties trend democrat early, and progressively get more republican as counting continues. Throughout election night, our ideal model would adjust the percentage reported so-far to match the final result, and any deviation from the final results is error. Our cumulative baseline root-squared-error without any adjustment was 939503. With a simple linear model our error improved to 723365, but the residuals showed that the data might benefit from polynomial adjustment, this however produced worse results in practice. Our polynomial model's error worsened to 732958, perhaps due to overfitting, so a linear model worked best for predicting the election night margins for the Georgia runoff election.

  • Cumulative baseline error 939503.0890928901
  • Cumulative simple regression error 723365.5316531633
  • Cumulative polynomial regression error 732958.2334045434