QuantRocket logo
Disclaimer


Pipeline Tutorial › Lesson 11: Custom Factors


Custom Factors¶

When we first looked at factors, we explored the set of built-in factors. Frequently, a desired computation isn't included as a built-in factor. One of the most powerful features of the Pipeline API is that it allows us to define our own custom factors. When a desired computation doesn't exist as a built-in, we define a custom factor.

Conceptually, a custom factor is identical to a built-in factor. It accepts inputs, window_length, and mask as constructor arguments, and returns a Factor object each day.

Let's take an example of a computation that doesn't exist as a built-in: standard deviation. To create a factor that computes the standard deviation over a trailing window, we can subclass zipline.pipeline.CustomFactor and implement a compute method whose signature is:

def compute(self, today, asset_ids, out, *inputs):
    ...
  • *inputs are M x N numpy arrays, where M is the window_length and N is the number of securities (usually around ~8000 unless a mask is provided). *inputs are trailing data windows. Note that there will be one M x N array for each BoundColumn provided in the factor's inputs list. The data type of each array will be the dtype of the corresponding BoundColumn.
  • out is an empty array of length N. out will be the output of our custom factor each day. The job of the compute method is to write output values into out.
  • asset_ids will be an integer array of length N containing security ids corresponding to the columns in our *inputs arrays.
  • today will be a pandas Timestamp representing the day for which compute is being called.

Of these, *inputs and out are most commonly used.

An instance of CustomFactor that has been added to a pipeline will have its compute method called every day. For example, let's define a custom factor that computes the standard deviation of the close price over the last 5 days. To start, let's add CustomFactor and numpy to our import statements.

In [1]:
from zipline.pipeline import Pipeline, EquityPricing
from zipline.pipeline.factors import CustomFactor
from zipline.research import run_pipeline
import numpy

Next, let's define our custom factor to calculate the standard deviation over a trailing window using numpy.nanstd:

In [2]:
class StdDev(CustomFactor):
    def compute(self, today, asset_ids, out, values):
        # Calculates the column-wise standard deviation, ignoring NaNs
        out[:] = numpy.nanstd(values, axis=0)

Finally, let's instantiate our factor in make_pipeline():

In [3]:
def make_pipeline():
    std_dev = StdDev(inputs=[EquityPricing.close], window_length=5)

    return Pipeline(
        columns={
            'std_dev': std_dev
        }
    )

When this pipeline is run, StdDev.compute() will be called every day with data as follows:

  • values: An M x N numpy array, where M is 5 (window_length), and N is ~8000 (the number of securities in our database on the day in question).
  • out: An empty array of length N (~8000). In this example, the job of compute is to populate out with an array storing the 5-day close price standard deviations.
In [4]:
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result
/opt/conda/lib/python3.11/site-packages/numpy/lib/nanfunctions.py:1879: RuntimeWarning: Degrees of freedom <= 0 for slice.
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
Out[4]:
std_dev
dateasset
2010-01-05Equity(FIBBG000C2V3D6 [A])0.396434
Equity(QI000000004076 [AABA])0.106283
Equity(FIBBG000BZWHH8 [AACC])0.211528
Equity(FIBBG000V2S3P6 [AACG])0.100665
Equity(FIBBG000M7KQ09 [AAI])0.020396
......
Equity(FIBBG011MC2100 [AATC])0.132755
Equity(FIBBG000GDBDH4 [BDG])NaN
Equity(FIBBG000008NR0 [ISM])NaN
Equity(FIBBG000GZ24W8 [PEM])NaN
Equity(FIBBG000BB5S87 [HCH])0.000000

7841 rows × 1 columns

Default Inputs¶

When writing a custom factor, we can set default inputs and window_length in our CustomFactor subclass. For example, let's define the TenDayMeanDifference custom factor to compute the mean difference between two data columns over a trailing window using numpy.nanmean. Let's set the default inputs to [EquityPricing.close, EquityPricing.open] and the default window_length to 10:

In [5]:
class TenDayMeanDifference(CustomFactor):
    # Default inputs.
    inputs = [EquityPricing.close, EquityPricing.open]
    window_length = 10
    def compute(self, today, asset_ids, out, close, open):
        # Calculates the column-wise mean difference, ignoring NaNs
        out[:] = numpy.nanmean(close - open, axis=0)

Remember in this case that `close` and `open` are each 10 x ~8000 2D numpy arrays.

If we call TenDayMeanDifference without providing any arguments, it will use the defaults.

In [6]:
# Computes the 10-day mean difference between the daily open and close prices.
close_open_diff = TenDayMeanDifference()

The defaults can be manually overridden by specifying arguments in the constructor call.

In [7]:
# Computes the 10-day mean difference between the daily high and low prices.
high_low_diff = TenDayMeanDifference(inputs=[EquityPricing.high, EquityPricing.low])

Further Example¶

Let's take another example where we build a momentum custom factor and use it to create a filter. We will then use that filter as a screen for our pipeline.

Let's start by defining a Momentum factor to be the division of the most recent close price by the close price from n days ago where n is the window_length.

In [8]:
class Momentum(CustomFactor):
    # Default inputs
    inputs = [EquityPricing.close]

    # Compute momentum
    def compute(self, today, assets, out, close):
        out[:] = close[-1] / close[0]

Now, let's instantiate our Momentum factor (twice) to create a 10-day momentum factor and a 20-day momentum factor. Let's also create a positive_momentum filter returning True for securities with both a positive 10-day momentum and a positive 20-day momentum.

In [9]:
ten_day_momentum = Momentum(window_length=10)
twenty_day_momentum = Momentum(window_length=20)

positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))

Next, let's add our momentum factors and our positive_momentum filter to make_pipeline. Let's also pass positive_momentum as a screen to our pipeline.

In [10]:
def make_pipeline():

    ten_day_momentum = Momentum(window_length=10)
    twenty_day_momentum = Momentum(window_length=20)

    positive_momentum = ((ten_day_momentum > 1) & (twenty_day_momentum > 1))

    std_dev = StdDev(inputs=[EquityPricing.close], window_length=5)

    return Pipeline(
        columns={
            'std_dev': std_dev,
            'ten_day_momentum': ten_day_momentum,
            'twenty_day_momentum': twenty_day_momentum
        },
        screen=positive_momentum
    )

Running this pipeline outputs the standard deviation and each of our momentum computations for securities with positive 10-day and 20-day momentum.

In [11]:
result = run_pipeline(make_pipeline(), start_date='2010-01-05', end_date='2010-01-05')
result
/opt/conda/lib/python3.11/site-packages/numpy/lib/nanfunctions.py:1879: RuntimeWarning: Degrees of freedom <= 0 for slice.
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
Out[11]:
std_devten_day_momentumtwenty_day_momentum
dateasset
2010-01-05Equity(FIBBG000C2V3D6 [A])0.3964341.0646261.048225
Equity(QI000000004076 [AABA])0.1062831.0594801.125741
Equity(FIBBG000BZWHH8 [AACC])0.2115281.1916671.185738
Equity(FIBBG000BD1373 [AAIC])0.2189981.0368421.133813
Equity(FIBBG000C2LZP3 [AAON])0.1850841.0090861.053091
............
Equity(FIBBG000N8D1G2 [ZSTN])0.7828001.2697801.346630
Equity(FIBBG000BXB8X8 [ZTR])0.0101721.0129201.028857
Equity(FIBBG000PYX812 [ZUMZ])0.1687131.0032471.038655
Equity(FIBBG000C3CQP1 [ZVO])0.0938941.0033721.026915
Equity(FIBBG000PZKV21 [ZZ])0.0440911.0457521.114983

4534 rows × 3 columns

Custom factors allow us to define custom computations in a pipeline. They are frequently the best way to perform computations on multiple data columns. The full documentation for CustomFactors is available in the API Reference.


Next Lesson: Initial Universe