A factor is a function from an asset and a moment in time to a number.
F(asset, timestamp) -> float
In Pipeline, Factors are the most commonly-used term, representing the result of any computation producing a numerical result. Factors require a column of data and a window length as input.
The simplest factors in Pipeline are built-in Factors. Built-in Factors are pre-built to perform common computations. As a first example, let's make a factor to compute the average close price over the last 10 days. We can use the SimpleMovingAverage
built-in factor which computes the average value of the input data (close price) over the specified window length (10 days). To do this, we need to import our built-in SimpleMovingAverage
factor and the EquityPricing
dataset.
# New from the last lesson, import the EquityPricing dataset.
from zipline.pipeline import Pipeline, EquityPricing
from zipline.research import run_pipeline
# New from the last lesson, import the built-in SimpleMovingAverage factor.
from zipline.pipeline.factors import SimpleMovingAverage
To see the full list of built-in factors, click on the factors
module in the above import statement then press Control, or see the API Reference.
Let's go back to our make_pipeline
function from the previous lesson and instantiate a SimpleMovingAverage
factor. To create a SimpleMovingAverage
factor, we can call the SimpleMovingAverage
constructor with two arguments: inputs, which must be a list of BoundColumn
objects, and window_length, which must be an integer indicating how many days worth of data our moving average calculation should receive. (We'll discuss BoundColumn
in more depth later; for now we just need to know that a BoundColumn
is an object indicating what kind of data should be passed to our Factor.).
The following line creates a Factor
for computing the 10-day mean close price of securities.
mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)
It's important to note that creating the factor does not actually perform a computation. Creating a factor is like defining the function. To perform a computation, we need to add the factor to our pipeline and run it.
Let's update our original empty pipeline to make it compute our new moving average factor. To start, let's move our factor instantiation into make_pipeline
. Next, we can tell our pipeline to compute our factor by passing it a columns
argument, which should be a dictionary mapping column names to factors, filters, or classifiers. Our updated make_pipeline
function should look something like this:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)
return Pipeline(
columns={
'10_day_mean_close': mean_close_10
}
)
To see what this looks like, let's make our pipeline, run it, and display the result.
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result
10_day_mean_close | ||
---|---|---|
date | asset | |
2010-01-05 | Equity(FIBBG000C2V3D6 [A]) | 30.432000 |
Equity(QI000000004076 [AABA]) | 16.605000 | |
Equity(FIBBG000BZWHH8 [AACC]) | 6.434000 | |
Equity(FIBBG000V2S3P6 [AACG]) | 4.501444 | |
Equity(FIBBG000M7KQ09 [AAI]) | 5.250000 | |
... | ... | |
Equity(FIBBG011MC2100 [AATC]) | 11.980500 | |
Equity(FIBBG000GDBDH4 [BDG]) | NaN | |
Equity(FIBBG000008NR0 [ISM]) | NaN | |
Equity(FIBBG000GZ24W8 [PEM]) | NaN | |
Equity(FIBBG000BB5S87 [HCH]) | 106.570000 |
7841 rows × 1 columns
Now we have a column in our pipeline output with the 10-day average close price for all ~8000 securities (display truncated). Note that each row corresponds to the result of our computation for a given security on a given date stored. The DataFrame
has a MultiIndex where the first level is a datetime representing the date of the computation and the second level is an Equity
object corresponding to the security.
If we run our pipeline over more than one day, the output looks like this.
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-07')
result
10_day_mean_close | ||
---|---|---|
date | asset | |
2010-01-05 | Equity(FIBBG000C2V3D6 [A]) | 30.432000 |
Equity(QI000000004076 [AABA]) | 16.605000 | |
Equity(FIBBG000BZWHH8 [AACC]) | 6.434000 | |
Equity(FIBBG000V2S3P6 [AACG]) | 4.501444 | |
Equity(FIBBG000M7KQ09 [AAI]) | 5.250000 | |
... | ... | ... |
2010-01-07 | Equity(FIBBG011MC2100 [AATC]) | 11.816000 |
Equity(FIBBG000GDBDH4 [BDG]) | NaN | |
Equity(FIBBG000008NR0 [ISM]) | NaN | |
Equity(FIBBG000GZ24W8 [PEM]) | NaN | |
Equity(FIBBG000BB5S87 [HCH]) | 109.796667 |
23534 rows × 1 columns
Note: factors can also be added to an existing Pipeline
instance using the Pipeline.add
method. Using add
looks something like this:
my_pipe = Pipeline()
f1 = SomeFactor(...)
my_pipe.add(f1, 'f1')
The most commonly used built-in Factor
is Latest
. The Latest
factor gets the most recent value of a given data column. This factor is common enough that it is instantiated differently from other factors. The best way to get the latest value of a data column is by getting its .latest
attribute. As an example, let's update make_pipeline
to create a latest close price factor and add it to our pipeline:
def make_pipeline():
mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)
latest_close = EquityPricing.close.latest
return Pipeline(
columns={
'10_day_mean_close': mean_close_10,
'latest_close_price': latest_close
}
)
And now, when we make and run our pipeline again, there are two columns in our output dataframe. One column has the 10-day mean close price of each security, and the other has the latest close price.
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result.head(5)
10_day_mean_close | latest_close_price | ||
---|---|---|---|
date | asset | ||
2010-01-05 | Equity(FIBBG000C2V3D6 [A]) | 30.432000 | 31.300 |
Equity(QI000000004076 [AABA]) | 16.605000 | 17.100 | |
Equity(FIBBG000BZWHH8 [AACC]) | 6.434000 | 7.150 | |
Equity(FIBBG000V2S3P6 [AACG]) | 4.501444 | 4.702 | |
Equity(FIBBG000M7KQ09 [AAI]) | 5.250000 | 5.180 |
.latest
can sometimes return things other than Factors
. We'll see examples of other possible return types in later lessons.
Some factors have default inputs that should never be changed. For example the VWAP built-in factor is always calculated from EquityPricing.close
and EquityPricing.volume
. When a factor is always calculated from the same BoundColumn
, we can call the constructor without specifying inputs
.
from zipline.pipeline.factors import VWAP
vwap = VWAP(window_length=10)
When choosing a start_date
for run_pipeline
, there are two gotchas to keep in mind. First, the earliest possible start_date
you can specify must be one day after the start date of the bundle. This is because the start_date
you pass to run_pipeline
indicates the first date you want to include in the pipeline output, and each day's pipeline output is based on the previous day's data. The purpose of this one-day lag is to avoid lookahead bias. Pipeline output tells you what you would have known at the start of each day, based on the previous day's data.
The learning bundle starts on 2007-01-03 (the first trading day of 2007), but if we try to run a pipeline that starts on (or before) that date, we'll get an error that tells us to start one day after the bundle start date:
result = run_pipeline(Pipeline(), start_date='2007-01-03', end_date='2007-01-03')
--------------------------------------------------------------------------- ValidationError Traceback (most recent call last) Cell In[9], line 1 ----> 1 result = run_pipeline(Pipeline(), start_date='2007-01-03', end_date='2007-01-03') File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:95, in run_pipeline(pipeline, start_date, end_date, bundle) 36 def run_pipeline( 37 pipeline: Pipeline, 38 start_date: str, 39 end_date: str = None, 40 bundle: str = None 41 ) -> pd.DataFrame: 42 """ 43 Compute values for pipeline from start_date to end_date, using the specified 44 bundle or the default bundle. (...) 93 factor = run_pipeline(pipeline, '2018-01-01', '2019-02-01', bundle="usstock-1min") 94 """ ---> 95 return _run_pipeline( 96 pipeline, 97 start_date=start_date, 98 end_date=end_date, 99 bundle=bundle) File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:149, in _run_pipeline(pipeline, start_date, end_date, bundle, mask) 147 second_session = exchange_calendar.next_session(first_session) 148 if start_date < second_session: --> 149 raise ValidationError( 150 f"start_date cannot be earlier than {second_session.date().isoformat()} " 151 f"for this bundle (one session after the bundle start date of {first_session.date().isoformat()})") 153 # Roll-forward start_date to valid session 154 for i in range(100): ValidationError: start_date cannot be earlier than 2007-01-04 for this bundle (one session after the bundle start date of 2007-01-03)
The second gotcha to keep in mind is that the start_date
you choose must also make allowance for the window_length
of your factors. The following pipeline includes a 10-day VWAP factor, so if we set the start_date
to 2007-01-04 (as suggested by the previous error message), we will get a new error (scroll to the bottom of the traceback for the useful error message):
pipeline = Pipeline(
columns={
"vwap": VWAP(window_length=10)
}
)
result = run_pipeline(pipeline, start_date='2007-01-04', end_date='2007-01-04')
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) File index.pyx:598, in pandas._libs.index.DatetimeEngine.get_loc() File pandas/_libs/hashtable_class_helper.pxi:2606, in pandas._libs.hashtable.Int64HashTable.get_item() File pandas/_libs/hashtable_class_helper.pxi:2630, in pandas._libs.hashtable.Int64HashTable.get_item() KeyError: 1166400000000000000 During handling of the above exception, another exception occurred: KeyError Traceback (most recent call last) File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/base.py:3790, in Index.get_loc(self, key) 3789 try: -> 3790 return self._engine.get_loc(casted_key) 3791 except KeyError as err: File index.pyx:566, in pandas._libs.index.DatetimeEngine.get_loc() File index.pyx:600, in pandas._libs.index.DatetimeEngine.get_loc() KeyError: Timestamp('2006-12-18 00:00:00') The above exception was the direct cause of the following exception: KeyError Traceback (most recent call last) File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:631, in DatetimeIndex.get_loc(self, key) 630 try: --> 631 return Index.get_loc(self, key) 632 except KeyError as err: File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/base.py:3797, in Index.get_loc(self, key) 3796 raise InvalidIndexError(key) -> 3797 raise KeyError(key) from err 3798 except TypeError: 3799 # If we have a listlike key, _check_indexing_error will raise 3800 # InvalidIndexError. Otherwise we fall through and re-raise 3801 # the TypeError. KeyError: Timestamp('2006-12-18 00:00:00') The above exception was the direct cause of the following exception: KeyError Traceback (most recent call last) File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:578, in BcolzDailyBarReader._load_raw_arrays_date_to_index(self, date) 577 try: --> 578 return self.sessions.get_loc(date) 579 except KeyError: File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:633, in DatetimeIndex.get_loc(self, key) 632 except KeyError as err: --> 633 raise KeyError(orig_key) from err KeyError: Timestamp('2006-12-18 00:00:00') During handling of the above exception, another exception occurred: NoDataOnDate Traceback (most recent call last) File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:763, in SimplePipelineEngine.compute_chunk(self, graph, dates, sids, workspace, refcounts, execution_order, hooks) 762 try: --> 763 loaded = loader.load_adjusted_array( 764 domain, to_load, mask_dates, sids, mask, 765 ) 766 except NoDataOnDate as e: File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/loaders/equity_pricing_loader.py:90, in EquityPricingLoader.load_adjusted_array(***failed resolving arguments***) 88 ohlcv_colnames = [c.name for c in ohlcv_cols] ---> 90 raw_ohlcv_arrays = self.raw_price_reader.load_raw_arrays( 91 ohlcv_colnames, 92 shifted_dates[0], 93 shifted_dates[-1], 94 sids, 95 ) 97 # Currency convert raw_arrays in place if necessary. We use shifted 98 # dates to load currency conversion rates to make them line up with 99 # dates used to fetch prices. File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:557, in BcolzDailyBarReader.load_raw_arrays(self, columns, start_date, end_date, assets) 556 def load_raw_arrays(self, columns, start_date, end_date, assets): --> 557 start_idx = self._load_raw_arrays_date_to_index(start_date) 558 end_idx = self._load_raw_arrays_date_to_index(end_date) File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:580, in BcolzDailyBarReader._load_raw_arrays_date_to_index(self, date) 579 except KeyError: --> 580 raise NoDataOnDate(date) NoDataOnDate: 2006-12-18 00:00:00 During handling of the above exception, another exception occurred: NoDataOnDate Traceback (most recent call last) Cell In[10], line 7 1 pipeline = Pipeline( 2 columns={ 3 "vwap": VWAP(window_length=10) 4 } 5 ) ----> 7 result = run_pipeline(pipeline, start_date='2007-01-04', end_date='2007-01-04') File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:95, in run_pipeline(pipeline, start_date, end_date, bundle) 36 def run_pipeline( 37 pipeline: Pipeline, 38 start_date: str, 39 end_date: str = None, 40 bundle: str = None 41 ) -> pd.DataFrame: 42 """ 43 Compute values for pipeline from start_date to end_date, using the specified 44 bundle or the default bundle. (...) 93 factor = run_pipeline(pipeline, '2018-01-01', '2019-02-01', bundle="usstock-1min") 94 """ ---> 95 return _run_pipeline( 96 pipeline, 97 start_date=start_date, 98 end_date=end_date, 99 bundle=bundle) File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:251, in _run_pipeline(pipeline, start_date, end_date, bundle, mask) 248 if use_chunks: 249 # Run in 1-years chunks to reduce memory usage 250 chunksize = 252 --> 251 results = engine.run_chunked_pipeline(pipeline, start_date, end_date, chunksize=chunksize) 252 else: 253 results = engine.run_pipeline(pipeline, start_date, end_date) File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:350, in SimplePipelineEngine.run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize, hooks) 348 run_pipeline = partial(self._run_pipeline_impl, pipeline, hooks=hooks) 349 with hooks.running_pipeline(pipeline, start_date, end_date): --> 350 chunks = [run_pipeline(s, e) for s, e in ranges] 352 if len(chunks) == 1: 353 # OPTIMIZATION: Don't make an extra copy in `categorical_df_concat` 354 # if we don't have to. 355 return chunks[0] File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:350, in <listcomp>(.0) 348 run_pipeline = partial(self._run_pipeline_impl, pipeline, hooks=hooks) 349 with hooks.running_pipeline(pipeline, start_date, end_date): --> 350 chunks = [run_pipeline(s, e) for s, e in ranges] 352 if len(chunks) == 1: 353 # OPTIMIZATION: Don't make an extra copy in `categorical_df_concat` 354 # if we don't have to. 355 return chunks[0] File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:440, in SimplePipelineEngine._run_pipeline_impl(self, pipeline, start_date, end_date, hooks) 434 execution_order = plan.execution_order(workspace, refcounts) 436 with hooks.computing_chunk(execution_order, 437 start_date, 438 end_date): --> 440 results = self.compute_chunk( 441 graph=plan, 442 dates=dates, 443 sids=sids, 444 workspace=workspace, 445 refcounts=refcounts, 446 execution_order=execution_order, 447 hooks=hooks, 448 ) 450 return self._to_narrow( 451 plan.outputs, 452 results, (...) 455 sids, 456 ) File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:777, in SimplePipelineEngine.compute_chunk(self, graph, dates, sids, workspace, refcounts, execution_order, hooks) 767 extra_rows = graph.extra_rows[term] 768 msg = ( 769 f"the pipeline definition requires {term} data on {str(e)} but no bundle data is " 770 "available on that date; the cause of this issue is that another pipeline term needs " (...) 775 f"the problem:\n\n{repr(graph)}" 776 ) --> 777 raise NoDataOnDate(msg) 778 assert set(loaded) == set(to_load), ( 779 'loader did not return an AdjustedArray for each column\n' 780 'expected: %r\n' (...) 784 ) 785 ) 786 workspace.update(loaded) NoDataOnDate: the pipeline definition requires EquityPricing<US>.close::float64 data on 2006-12-18 00:00:00 but no bundle data is available on that date; the cause of this issue is that another pipeline term needs EquityPricing<US>.close::float64 and has a window_length of 10, which necessitates loading 9 extra rows of EquityPricing<US>.close::float64; try setting a later start date so that the maximum window_length of any term doesn't extend further back than the bundle start date. Review the pipeline dependencies below to help determine which terms are causing the problem: {'dependencies': [{'term': EquityPricing<US>.close::float64, 'used_by': VWAP([EquityPricing.close, EquityPricing.volume], 10)}, {'term': EquityPricing<US>.volume::float64, 'used_by': VWAP([EquityPricing.close, EquityPricing.volume], 10)}], 'nodes': [{'extra_rows': 9, 'needed_for': EquityPricing<US>.close::float64}, {'extra_rows': 9, 'needed_for': EquityPricing<US>.volume::float64}]}
The error message indicates that we would need data back to 2006-12-18 in order to calculate a 10-day VWAP and produce pipeline output on 2007-01-04 (window_length
is measured in trading days, not calendar days). The solution is to set a later start date so that the VWAP factor doesn't require data prior to the bundle start date of 2007-01-03. In this example, the earliest possible start_date
turns out to be 2007-01-18 (14 calendar days, or 10 trading days, after 2007-01-04).
result = run_pipeline(pipeline, start_date='2007-01-18', end_date='2007-01-18')
Next Lesson: Combining Factors