QuantRocket logo
Disclaimer


Pipeline Tutorial › Lesson 4: Factors


Factors¶

A factor is a function from an asset and a moment in time to a number.

F(asset, timestamp) -> float

In Pipeline, Factors are the most commonly-used term, representing the result of any computation producing a numerical result. Factors require a column of data and a window length as input.

The simplest factors in Pipeline are built-in Factors. Built-in Factors are pre-built to perform common computations. As a first example, let's make a factor to compute the average close price over the last 10 days. We can use the SimpleMovingAverage built-in factor which computes the average value of the input data (close price) over the specified window length (10 days). To do this, we need to import our built-in SimpleMovingAverage factor and the EquityPricing dataset.

In [1]:
# New from the last lesson, import the EquityPricing dataset.
from zipline.pipeline import Pipeline, EquityPricing
from zipline.research import run_pipeline

# New from the last lesson, import the built-in SimpleMovingAverage factor.
from zipline.pipeline.factors import SimpleMovingAverage

To see the full list of built-in factors, click on the factors module in the above import statement then press Control, or see the API Reference.

Creating a Factor¶

Let's go back to our make_pipeline function from the previous lesson and instantiate a SimpleMovingAverage factor. To create a SimpleMovingAverage factor, we can call the SimpleMovingAverage constructor with two arguments: inputs, which must be a list of BoundColumn objects, and window_length, which must be an integer indicating how many days worth of data our moving average calculation should receive. (We'll discuss BoundColumn in more depth later; for now we just need to know that a BoundColumn is an object indicating what kind of data should be passed to our Factor.).

The following line creates a Factor for computing the 10-day mean close price of securities.

In [2]:
mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)

It's important to note that creating the factor does not actually perform a computation. Creating a factor is like defining the function. To perform a computation, we need to add the factor to our pipeline and run it.

Adding a Factor to a Pipeline¶

Let's update our original empty pipeline to make it compute our new moving average factor. To start, let's move our factor instantiation into make_pipeline. Next, we can tell our pipeline to compute our factor by passing it a columns argument, which should be a dictionary mapping column names to factors, filters, or classifiers. Our updated make_pipeline function should look something like this:

In [3]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)
    
    return Pipeline(
        columns={
            '10_day_mean_close': mean_close_10
        }
    )

To see what this looks like, let's make our pipeline, run it, and display the result.

In [4]:
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result
Out[4]:
10_day_mean_close
dateasset
2010-01-05Equity(FIBBG000C2V3D6 [A])30.432000
Equity(QI000000004076 [AABA])16.605000
Equity(FIBBG000BZWHH8 [AACC])6.434000
Equity(FIBBG000V2S3P6 [AACG])4.501444
Equity(FIBBG000M7KQ09 [AAI])5.250000
......
Equity(FIBBG011MC2100 [AATC])11.980500
Equity(FIBBG000GDBDH4 [BDG])NaN
Equity(FIBBG000008NR0 [ISM])NaN
Equity(FIBBG000GZ24W8 [PEM])NaN
Equity(FIBBG000BB5S87 [HCH])106.570000

7841 rows × 1 columns

Now we have a column in our pipeline output with the 10-day average close price for all ~8000 securities (display truncated). Note that each row corresponds to the result of our computation for a given security on a given date stored. The DataFrame has a MultiIndex where the first level is a datetime representing the date of the computation and the second level is an Equity object corresponding to the security.

If we run our pipeline over more than one day, the output looks like this.

In [5]:
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-07')
result
Out[5]:
10_day_mean_close
dateasset
2010-01-05Equity(FIBBG000C2V3D6 [A])30.432000
Equity(QI000000004076 [AABA])16.605000
Equity(FIBBG000BZWHH8 [AACC])6.434000
Equity(FIBBG000V2S3P6 [AACG])4.501444
Equity(FIBBG000M7KQ09 [AAI])5.250000
.........
2010-01-07Equity(FIBBG011MC2100 [AATC])11.816000
Equity(FIBBG000GDBDH4 [BDG])NaN
Equity(FIBBG000008NR0 [ISM])NaN
Equity(FIBBG000GZ24W8 [PEM])NaN
Equity(FIBBG000BB5S87 [HCH])109.796667

23534 rows × 1 columns

Note: factors can also be added to an existing Pipeline instance using the Pipeline.add method. Using add looks something like this:

my_pipe = Pipeline()
f1 = SomeFactor(...)
my_pipe.add(f1, 'f1')

Latest¶

The most commonly used built-in Factor is Latest. The Latest factor gets the most recent value of a given data column. This factor is common enough that it is instantiated differently from other factors. The best way to get the latest value of a data column is by getting its .latest attribute. As an example, let's update make_pipeline to create a latest close price factor and add it to our pipeline:

In [6]:
def make_pipeline():

    mean_close_10 = SimpleMovingAverage(inputs=EquityPricing.close, window_length=10)
    latest_close = EquityPricing.close.latest

    return Pipeline(
        columns={
            '10_day_mean_close': mean_close_10,
            'latest_close_price': latest_close
        }
    )

And now, when we make and run our pipeline again, there are two columns in our output dataframe. One column has the 10-day mean close price of each security, and the other has the latest close price.

In [7]:
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result.head(5)
Out[7]:
10_day_mean_closelatest_close_price
dateasset
2010-01-05Equity(FIBBG000C2V3D6 [A])30.43200031.300
Equity(QI000000004076 [AABA])16.60500017.100
Equity(FIBBG000BZWHH8 [AACC])6.4340007.150
Equity(FIBBG000V2S3P6 [AACG])4.5014444.702
Equity(FIBBG000M7KQ09 [AAI])5.2500005.180

.latest can sometimes return things other than Factors. We'll see examples of other possible return types in later lessons.

Default Inputs¶

Some factors have default inputs that should never be changed. For example the VWAP built-in factor is always calculated from EquityPricing.close and EquityPricing.volume. When a factor is always calculated from the same BoundColumn, we can call the constructor without specifying inputs.

In [8]:
from zipline.pipeline.factors import VWAP
vwap = VWAP(window_length=10)

Choosing a Start Date¶

When choosing a start_date for run_pipeline, there are two gotchas to keep in mind. First, the earliest possible start_date you can specify must be one day after the start date of the bundle. This is because the start_date you pass to run_pipeline indicates the first date you want to include in the pipeline output, and each day's pipeline output is based on the previous day's data. The purpose of this one-day lag is to avoid lookahead bias. Pipeline output tells you what you would have known at the start of each day, based on the previous day's data.

The learning bundle starts on 2007-01-03 (the first trading day of 2007), but if we try to run a pipeline that starts on (or before) that date, we'll get an error that tells us to start one day after the bundle start date:

In [9]:
result = run_pipeline(Pipeline(), start_date='2007-01-03', end_date='2007-01-03')
---------------------------------------------------------------------------
ValidationError                           Traceback (most recent call last)
Cell In[9], line 1
----> 1 result = run_pipeline(Pipeline(), start_date='2007-01-03', end_date='2007-01-03')

File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:95, in run_pipeline(pipeline, start_date, end_date, bundle)
     36 def run_pipeline(
     37     pipeline: Pipeline,
     38     start_date: str,
     39     end_date: str = None,
     40     bundle: str = None
     41     ) -> pd.DataFrame:
     42     """
     43     Compute values for pipeline from start_date to end_date, using the specified
     44     bundle or the default bundle.
   (...)
     93         factor = run_pipeline(pipeline, '2018-01-01', '2019-02-01', bundle="usstock-1min")
     94     """
---> 95     return _run_pipeline(
     96         pipeline,
     97         start_date=start_date,
     98         end_date=end_date,
     99         bundle=bundle)

File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:149, in _run_pipeline(pipeline, start_date, end_date, bundle, mask)
    147 second_session = exchange_calendar.next_session(first_session)
    148 if start_date < second_session:
--> 149     raise ValidationError(
    150         f"start_date cannot be earlier than {second_session.date().isoformat()} "
    151         f"for this bundle (one session after the bundle start date of {first_session.date().isoformat()})")
    153 # Roll-forward start_date to valid session
    154 for i in range(100):

ValidationError: start_date cannot be earlier than 2007-01-04 for this bundle (one session after the bundle start date of 2007-01-03)

The second gotcha to keep in mind is that the start_date you choose must also make allowance for the window_length of your factors. The following pipeline includes a 10-day VWAP factor, so if we set the start_date to 2007-01-04 (as suggested by the previous error message), we will get a new error (scroll to the bottom of the traceback for the useful error message):

In [10]:
pipeline = Pipeline(
    columns={
        "vwap": VWAP(window_length=10)
    }
)

result = run_pipeline(pipeline, start_date='2007-01-04', end_date='2007-01-04')
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File index.pyx:598, in pandas._libs.index.DatetimeEngine.get_loc()

File pandas/_libs/hashtable_class_helper.pxi:2606, in pandas._libs.hashtable.Int64HashTable.get_item()

File pandas/_libs/hashtable_class_helper.pxi:2630, in pandas._libs.hashtable.Int64HashTable.get_item()

KeyError: 1166400000000000000

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/base.py:3790, in Index.get_loc(self, key)
   3789 try:
-> 3790     return self._engine.get_loc(casted_key)
   3791 except KeyError as err:

File index.pyx:566, in pandas._libs.index.DatetimeEngine.get_loc()

File index.pyx:600, in pandas._libs.index.DatetimeEngine.get_loc()

KeyError: Timestamp('2006-12-18 00:00:00')

The above exception was the direct cause of the following exception:

KeyError                                  Traceback (most recent call last)
File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:631, in DatetimeIndex.get_loc(self, key)
    630 try:
--> 631     return Index.get_loc(self, key)
    632 except KeyError as err:

File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/base.py:3797, in Index.get_loc(self, key)
   3796         raise InvalidIndexError(key)
-> 3797     raise KeyError(key) from err
   3798 except TypeError:
   3799     # If we have a listlike key, _check_indexing_error will raise
   3800     #  InvalidIndexError. Otherwise we fall through and re-raise
   3801     #  the TypeError.

KeyError: Timestamp('2006-12-18 00:00:00')

The above exception was the direct cause of the following exception:

KeyError                                  Traceback (most recent call last)
File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:578, in BcolzDailyBarReader._load_raw_arrays_date_to_index(self, date)
    577 try:
--> 578     return self.sessions.get_loc(date)
    579 except KeyError:

File /opt/conda/lib/python3.11/site-packages/pandas/core/indexes/datetimes.py:633, in DatetimeIndex.get_loc(self, key)
    632 except KeyError as err:
--> 633     raise KeyError(orig_key) from err

KeyError: Timestamp('2006-12-18 00:00:00')

During handling of the above exception, another exception occurred:

NoDataOnDate                              Traceback (most recent call last)
File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:763, in SimplePipelineEngine.compute_chunk(self, graph, dates, sids, workspace, refcounts, execution_order, hooks)
    762 try:
--> 763     loaded = loader.load_adjusted_array(
    764         domain, to_load, mask_dates, sids, mask,
    765     )
    766 except NoDataOnDate as e:

File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/loaders/equity_pricing_loader.py:90, in EquityPricingLoader.load_adjusted_array(***failed resolving arguments***)
     88 ohlcv_colnames = [c.name for c in ohlcv_cols]
---> 90 raw_ohlcv_arrays = self.raw_price_reader.load_raw_arrays(
     91     ohlcv_colnames,
     92     shifted_dates[0],
     93     shifted_dates[-1],
     94     sids,
     95 )
     97 # Currency convert raw_arrays in place if necessary. We use shifted
     98 # dates to load currency conversion rates to make them line up with
     99 # dates used to fetch prices.

File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:557, in BcolzDailyBarReader.load_raw_arrays(self, columns, start_date, end_date, assets)
    556 def load_raw_arrays(self, columns, start_date, end_date, assets):
--> 557     start_idx = self._load_raw_arrays_date_to_index(start_date)
    558     end_idx = self._load_raw_arrays_date_to_index(end_date)

File /opt/conda/lib/python3.11/site-packages/zipline/data/bcolz_daily_bars.py:580, in BcolzDailyBarReader._load_raw_arrays_date_to_index(self, date)
    579 except KeyError:
--> 580     raise NoDataOnDate(date)

NoDataOnDate: 2006-12-18 00:00:00

During handling of the above exception, another exception occurred:

NoDataOnDate                              Traceback (most recent call last)
Cell In[10], line 7
      1 pipeline = Pipeline(
      2     columns={
      3         "vwap": VWAP(window_length=10)
      4     }
      5 )
----> 7 result = run_pipeline(pipeline, start_date='2007-01-04', end_date='2007-01-04')

File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:95, in run_pipeline(pipeline, start_date, end_date, bundle)
     36 def run_pipeline(
     37     pipeline: Pipeline,
     38     start_date: str,
     39     end_date: str = None,
     40     bundle: str = None
     41     ) -> pd.DataFrame:
     42     """
     43     Compute values for pipeline from start_date to end_date, using the specified
     44     bundle or the default bundle.
   (...)
     93         factor = run_pipeline(pipeline, '2018-01-01', '2019-02-01', bundle="usstock-1min")
     94     """
---> 95     return _run_pipeline(
     96         pipeline,
     97         start_date=start_date,
     98         end_date=end_date,
     99         bundle=bundle)

File /opt/conda/lib/python3.11/site-packages/zipline/research/pipeline.py:251, in _run_pipeline(pipeline, start_date, end_date, bundle, mask)
    248 if use_chunks:
    249     # Run in 1-years chunks to reduce memory usage
    250     chunksize = 252
--> 251     results = engine.run_chunked_pipeline(pipeline, start_date, end_date, chunksize=chunksize)
    252 else:
    253     results = engine.run_pipeline(pipeline, start_date, end_date)

File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:350, in SimplePipelineEngine.run_chunked_pipeline(self, pipeline, start_date, end_date, chunksize, hooks)
    348 run_pipeline = partial(self._run_pipeline_impl, pipeline, hooks=hooks)
    349 with hooks.running_pipeline(pipeline, start_date, end_date):
--> 350     chunks = [run_pipeline(s, e) for s, e in ranges]
    352 if len(chunks) == 1:
    353     # OPTIMIZATION: Don't make an extra copy in `categorical_df_concat`
    354     # if we don't have to.
    355     return chunks[0]

File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:350, in <listcomp>(.0)
    348 run_pipeline = partial(self._run_pipeline_impl, pipeline, hooks=hooks)
    349 with hooks.running_pipeline(pipeline, start_date, end_date):
--> 350     chunks = [run_pipeline(s, e) for s, e in ranges]
    352 if len(chunks) == 1:
    353     # OPTIMIZATION: Don't make an extra copy in `categorical_df_concat`
    354     # if we don't have to.
    355     return chunks[0]

File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:440, in SimplePipelineEngine._run_pipeline_impl(self, pipeline, start_date, end_date, hooks)
    434 execution_order = plan.execution_order(workspace, refcounts)
    436 with hooks.computing_chunk(execution_order,
    437                            start_date,
    438                            end_date):
--> 440     results = self.compute_chunk(
    441         graph=plan,
    442         dates=dates,
    443         sids=sids,
    444         workspace=workspace,
    445         refcounts=refcounts,
    446         execution_order=execution_order,
    447         hooks=hooks,
    448     )
    450 return self._to_narrow(
    451     plan.outputs,
    452     results,
   (...)
    455     sids,
    456 )

File /opt/conda/lib/python3.11/site-packages/zipline/pipeline/engine.py:777, in SimplePipelineEngine.compute_chunk(self, graph, dates, sids, workspace, refcounts, execution_order, hooks)
    767         extra_rows = graph.extra_rows[term]
    768         msg = (
    769             f"the pipeline definition requires {term} data on {str(e)} but no bundle data is "
    770             "available on that date; the cause of this issue is that another pipeline term needs "
   (...)
    775             f"the problem:\n\n{repr(graph)}"
    776         )
--> 777         raise NoDataOnDate(msg)
    778 assert set(loaded) == set(to_load), (
    779     'loader did not return an AdjustedArray for each column\n'
    780     'expected: %r\n'
   (...)
    784     )
    785 )
    786 workspace.update(loaded)

NoDataOnDate: the pipeline definition requires EquityPricing<US>.close::float64 data on 2006-12-18 00:00:00 but no bundle data is available on that date; the cause of this issue is that another pipeline term needs EquityPricing<US>.close::float64 and has a window_length of 10, which necessitates loading 9 extra rows of EquityPricing<US>.close::float64; try setting a later start date so that the maximum window_length of any term doesn't extend further back than the bundle start date. Review the pipeline dependencies below to help determine which terms are causing the problem:

{'dependencies': [{'term': EquityPricing<US>.close::float64,
                   'used_by': VWAP([EquityPricing.close, EquityPricing.volume], 10)},
                  {'term': EquityPricing<US>.volume::float64,
                   'used_by': VWAP([EquityPricing.close, EquityPricing.volume], 10)}],
 'nodes': [{'extra_rows': 9, 'needed_for': EquityPricing<US>.close::float64},
           {'extra_rows': 9, 'needed_for': EquityPricing<US>.volume::float64}]}

The error message indicates that we would need data back to 2006-12-18 in order to calculate a 10-day VWAP and produce pipeline output on 2007-01-04 (window_length is measured in trading days, not calendar days). The solution is to set a later start date so that the VWAP factor doesn't require data prior to the bundle start date of 2007-01-03. In this example, the earliest possible start_date turns out to be 2007-01-18 (14 calendar days, or 10 trading days, after 2007-01-04).

In [11]:
result = run_pipeline(pipeline, start_date='2007-01-18', end_date='2007-01-18')

Next Lesson: Combining Factors