Populating Timestream With Scraped Predictit Market Ticks 1

Posted on Wed 04 October 2023 in Election Modeling

Amazon Timestream is a fully managed time series database. It's a relatively new offering to the AWS serverless portfolio. Main Timestream use case is to store IoT device-generated time series data, but our goal is to eventually build a notification and monitoring system based on prediction market tick data that we’re going to store in Timestream. First we need to capture our data, and we’ll be using Scrapy, an application framework for crawling websites, to scrape market data from predictit.com, and store the market tick data in AWS Timestream. This will be initial exploratory working session. In the next section, we'll implement a full parse and store workflow on a schedule to capture real-time time-series market data.

Architecture

The python code to capture election night data will be hosted on an AWS serverless lambda function. An evenBridge trigger will execute the code every minute, and each election capture will be saved into on an s3 bucket.

Architecture

Scrapy will be doing heavy lifting with low level networking, scheduling, and downloading functionality. We'll implement two main parts of the scraping interface: * Spider – This is the entry point for our requests. This is where We specify the initial URL to crawl, after which we parse the response or scheduling subsequent requests in our scraping workflow. * Item Pipeline – Once extracted by the spider, the item is processed in the Item Pipeline. In our case, this is where we’re writing our scraped predict market data to Timestream

Before we implement any code, we need to create our Timestream database

Timestream

Before we implement code to scrape predictit markets, we need to create our Timestream database. Let's take a look at the data we're parsing to get an idea our schema.

Schema

market_contract

Predictit consists of markets at the top level, with contracts within each market. Each market is indexed by a market ID, and each contract is associated with a contract ID. In the above example:

Market Name Market ID Contract Names Contract IDs
"Which party will win the 2024 US Senate election in Arizona?" 8070 "Democratic"
"Republican"
"Independent"
31257
31256
31258

Dimension represents the metadata attributes of the time series. The above values will comprise our Timestream dimensions.

Based on the Market ID, the Predictit API returns the below contract level data.

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="ArrayOfContractListResourceModel">
    <xs:complexType>
      <xs:sequence>
        <xs:element name="ContractListResourceModel" maxOccurs="unbounded" minOccurs="0">
          <xs:complexType>
            <xs:sequence>
              <xs:element type="xs:short" name="ContractId"/>
              <xs:element type="xs:string" name="ContractName"/>
              <xs:element type="xs:short" name="MarketId"/>
              <xs:element type="xs:string" name="MarketName"/>
              <xs:element type="xs:string" name="ContractImageUrl"/>
              <xs:element type="xs:string" name="ContractImageSmallUrl"/>
              <xs:element type="xs:string" name="IsActive"/>
              <xs:element type="xs:string" name="IsOpen"/>
              <xs:element type="xs:float" name="LastTradePrice"/>
              <xs:element type="xs:float" name="LastClosePrice"/>
              <xs:element type="xs:float" name="BestYesPrice"/>
              <xs:element type="xs:short" name="BestYesQuantity"/>
              <xs:element type="xs:string" name="BestNoPrice"/>
              <xs:element type="xs:short" name="BestNoQuantity"/>
              <xs:element type="xs:byte" name="UserPrediction"/>
              <xs:element type="xs:byte" name="UserQuantity"/>
              <xs:element type="xs:byte" name="UserOpenOrdersBuyQuantity"/>
              <xs:element type="xs:byte" name="UserOpenOrdersSellQuantity"/>
              <xs:element type="xs:byte" name="UserAveragePricePerShare"/>
              <xs:element type="xs:string" name="IsTradingSuspended"/>
              <xs:element type="xs:dateTime" name="DateOpened"/>
              <xs:element type="xs:string" name="HiddenByDefault"/>
              <xs:element type="xs:byte" name="DisplayOrder"/>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
</xs:schema>

Measure is the actual value being measured by the record of the time series. From the above, the measures we're going to store are:

  • BestNoPrice
  • BestYesQuantity
  • BestYesPrice
  • BestNoQuantity
  • LastTradePrice
  • LastClosePrice

Below is the final schema of our Timestream table.

Column Type Timestream attribute type
Contract_ID varchar DIMENSION
Market_Name varchar DIMENSION
Market_ID varchar DIMENSION
Contract_Name varchar DIMENSION
measure_name varchar MEASURE_NAME
BestNoPrice double MULTI
BestYesQuantity bigint MULTI
BestYesPrice double MULTI
BestNoQuantity bigint MULTI
LastTradePrice double MULTI
LastClosePrice double MULTI
time timestamp TIMESTAMP

The only other column, besides the Measure and the Dimension, is the TIMESTAMP.

Database

First we'll create our database

Timestream_create_database

We'll name our database prediction_markets, and keep all other setting default.

Table

We'll call our table predictit

Timestream_create_table

Timestream allows only one partition-key to be chosen on a table, and our main use-case will query based on Contract_ID, which will be our partition key, and keep all other setting default.

Scrapy

Now that we have our Timestream database created, we can implement our predictit scraping framework. Full code can be found here The entry point for scrapy is the spider class.

Spider

Spiders are classes where we define the custom behaviour for crawling and parsing pages for a particular site. for Predictit, we have two main pages to crawl

  1. https://www.predictit.org/api/marketdata/all/ - Data for all markets on the site
  2. https://www.predictit.org/api/marketdata/markets/[Market_ID] - Data for a specific market indexed by Market_ID

We first get a list of all markets from api/marketdata/all/, we then get detailed contract data for each specific market returned from api/marketdata/markets/[Market_ID].

Below is our entry point for our spider class

class MarketSpider(scrapy.Spider):
    name = "markets"
    def start_requests(self):
        #crawl main list of markets from marketdata/all to get list of markets
        query = {}
        base_url='https://www.predictit.org/'
        path='api/marketdata/all'
        urls = [
            build_url(base_url,path,query),
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

The first requests to perform are obtained by calling the start_requests() method which generates Request for the URLs specified in urls and the parse method as callback function for the Requests.

    def parse(self, response):
        time = datetime.utcnow().isoformat()
        #once list of markets is obtained from api/marketdata/all/
        #crawl contract data for each individual market through api/Market/
        base_url = 'https://www.predictit.org/'
        base_path = 'api/Market/'
        query = {}
        for market in response.css('MarketData'):
            loader = ItemLoader(item=MarketsItem(), selector=market)
            loader.add_css('ID', 'ID::text')
            loader.add_css('Name', 'Name::text')
            loader.add_css('URL', 'URL::text')
            loader.add_value('Time', time)
            maket_id =  market.css('ID::text').get()
            self.log(f'market_id:{maket_id}')
            if maket_id in MARKET_WATCHLIST:
                #we only crawl market data from api/marketdata/all/ZA if it's in our list
                path = base_path + maket_id + '/Contracts'
                url = build_url(base_url, path, query)
                self.log(f'url: {url}')
                yield  scrapy.Request(url=url, callback=self.parse_contracts,meta={'loader':loader})
            else:
                #otherwise, we take contract data from api/Market/
                self.log(f'parse: MarketContracts')
                item = MarketContractsItem
                loader.add_value('Contracts', self.get_contracts(market,item))
                yield loader.load_item()

In the parse callback function we get a list of market IDs, and we perform one of two parsing tasks: 1. Parse basic contract data for each individual market returned from api/marketdata/all/ 2. Crawl and parse more detailed contract data for each individual market through api/marketdata/markets/[Market_ID] based on our specified list in MARKET_WATCHLIST.

Pipeline

After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially. In Our pipeline class, MarketsPipeline, the process_item method is called for every item pipeline component.

class MarketsPipeline:
    # default constructor
    def __init__(self):
        self.write_api = None
        self.client = None
    def open_spider(self, spider):
        _logging.info('create influxdb connections...')
        try:
            self.client = boto3.client('timestream-write')
        except Exception as e:
            _logging.error('connection error: %s', traceback.format_exc())
    @staticmethod
    def _current_milli_time():
        return str(int(round(time.time() * 1000)))
    def process_item(self, item, spider):
        _logging.info("Item: "+str(item))
        measurements = []
        def def_value():
            return None
        def prepare_common_attributes(Market_ID, Market_Name,Contract_ID,Contract_Name):
            common_attributes = {
                'Dimensions': [
                    {'Name': 'Market_Name', 'Value': Market_Name},
                    {'Name': 'Market_ID', 'Value': Market_ID},
                    {'Name': 'Contract_ID', 'Value': Contract_ID},
                    {'Name': 'Contract_Name', 'Value': Contract_Name}
                ],
                'MeasureName': 'market_tick',
                'MeasureValueType': 'MULTI'
            }
            return common_attributes
        def prepare_record(current_time):
            record = {
                'Time': str(current_time),
                'MeasureValues': []
            }
            return record
        def prepare_measure(measure_name, measure_value, measure_type):
            measure = {
                'Name': measure_name,
                'Value': str(measure_value),
                'Type': measure_type
            }
            return measure
        time = datetime.utcnow().isoformat()

Inside process_item, we first implement some helper functions that construct our Timestream schema.

  • prepare_common_attributes builds our Timestream dimensions
  • prepare_record is the top layer container that warps each measure
  • prepare_measure builds each Timestream measure

Our table is formatted in Multi-measure records schema which allows for multiple measures per record. Below is the layout built by our helper functions and represents what an entry in our Timestream table looks like.

Column Type Timestream attribute type
Contract_ID varchar DIMENSION
Market_Name varchar DIMENSION
Market_ID varchar DIMENSION
Contract_Name varchar DIMENSION
measure_name varchar MEASURE_NAME
BestNoPrice double MULTI
BestYesQuantity bigint MULTI
BestYesPrice double MULTI
BestNoQuantity bigint MULTI
LastTradePrice double MULTI
LastClosePrice double MULTI
time timestamp TIMESTAMP
        for contract in item['Contracts']:
            contractsItemDict=defaultdict(def_value)
            for field in MarketContractsItem.fields.keys():
                contractsItemDict[field] = contract.get(field)
            for field in ContractContractsItem.fields.keys():
                contractsItemDict[field] = contract.get(field)
            current_time = self._current_milli_time()

            common_attributes = prepare_common_attributes(str(item['ID']),
                                                          str(item['Name']),
                                                          str(contractsItemDict['ID'] if contractsItemDict['ID'] else contractsItemDict['ContractId']),
                                                          str(contractsItemDict['Name'] if contractsItemDict['Name'] else contractsItemDict['ContractName'])    )
            record = prepare_record(current_time)

            contractBestNoPrice=contractsItemDict['BestNoPrice'] if contractsItemDict['BestNoPrice'] else contractsItemDict['BestBuyNoCost']
            if contractBestNoPrice is not None:
                record['MeasureValues'].append(prepare_measure('BestNoPrice', contractBestNoPrice,'DOUBLE'))
            contractBestBuyYesCost=contractsItemDict['BestYesPrice'] if contractsItemDict['BestYesPrice'] else contractsItemDict['BestBuyYesCost']
            if contractBestBuyYesCost is not None:
                record['MeasureValues'].append(prepare_measure('BestYesPrice', contractBestBuyYesCost,'DOUBLE'))
            contractLastClosePrice=contractsItemDict['LastClosePrice']
            if contractLastClosePrice is not None:
                record['MeasureValues'].append(prepare_measure('LastClosePrice', contractLastClosePrice,'DOUBLE'))
            contractLastTradePrice=contractsItemDict['LastTradePrice']
            if contractLastTradePrice is not None:
                record['MeasureValues'].append(prepare_measure('LastTradePrice', contractLastTradePrice,'DOUBLE'))
            contractBestYesQuantity=contractsItemDict['BestYesQuantity']
            if contractBestYesQuantity is not None:
                record['MeasureValues'].append(prepare_measure('BestYesQuantity', contractBestYesQuantity,'BIGINT'))
            contractBestNoQuantity=contractsItemDict['BestNoQuantity']
            if contractBestNoQuantity is not None:
                record['MeasureValues'].append(prepare_measure('BestNoQuantity', contractBestNoQuantity,'BIGINT'))
            records=[record]
            try:
                _logging.info(f'records:{records}')
                result = self.client.write_records(DatabaseName=settings.DATABASE_NAME,
                                                   TableName=settings.TABLE_NAME,
                                                   Records=records,
                                                   CommonAttributes=common_attributes)

Our pipeline class, MarketsPipeline, loops through the parsed contracts and writes records to our Timestream table.

Conclusion

In this section we're just performing an initual exploration into scraping predictit markets with our scrapy framework. We can test scrapy by running our spider on an EC2 instance in the same availability zones as our Timestream database by executing scrapy crawl markets. You should see several pages of scrapy logs printed to screen:

 'Time': '2023-10-03T03:16:50.825516',
 'URL': 'https://www.predictit.org/markets/detail/8077/What-will-be-the-Electoral-College-margin-in-the-2024-presidential-election'}
2023-10-03 03:16:57 [scrapy.core.engine] INFO: Closing spider (finished)
2023-10-03 03:16:57 [root] INFO: close Timestream...
2023-10-03 03:16:57 [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 1358,
 'downloader/request_count': 5,
 'downloader/request_method_count/GET': 5,
 'downloader/response_bytes': 26745,
 'downloader/response_count': 5,
 'downloader/response_status_count/200': 5,
 'elapsed_time_seconds': 7.34493,
 'finish_reason': 'finished',
 'finish_time': datetime.datetime(2023, 10, 3, 3, 16, 57, 608700),
 'httpcompression/response_bytes': 116684,
 'httpcompression/response_count': 5,
 'item_scraped_count': 20,
 'log_count/DEBUG': 3275,
 'log_count/INFO': 411,
 'log_count/WARNING': 1,
 'memusage/max': 76316672,
 'memusage/startup': 76316672,
 'request_depth_max': 1,
 'response_received_count': 5,
 'robotstxt/request_count': 1,
 'robotstxt/response_count': 1,
 'robotstxt/response_status_count/200': 1,
 'scheduler/dequeued': 4,
 'scheduler/dequeued/memory': 4,
 'scheduler/enqueued': 4,
 'scheduler/enqueued/memory': 4,
 'start_time': datetime.datetime(2023, 10, 3, 3, 16, 50, 263770)}
2023-10-03 03:16:57 [scrapy.core.engine] INFO: Spider closed (finished)

Querying our Timestream database shows data is being populated.

Timestream_create_database

In the next section:

  1. Move our scrapy code into a lambda function
  2. Create an EventBridge Trigger to execute our lambda function on a schedule
  3. Generate true time series data for our Timestream database