import zipline.api as algo
from zipline.pipeline import Pipeline
from zipline.pipeline.factors import AverageDollarVolume, Returns
from zipline.finance.execution import MarketOrder
MOMENTUM_WINDOW = 252
def initialize(context: algo.Context):
"""
Called once at the start of a backtest, and once per day in
live trading.
"""
algo.attach_pipeline(make_pipeline(), 'pipeline')
algo.set_benchmark(algo.symbol('SPY'))
algo.schedule_function(
rebalance,
algo.date_rules.every_day(),
algo.time_rules.market_close(minutes=30),
)
def make_pipeline():
"""
Create a pipeline that filters by dollar volume and
calculates return.
"""
pipeline = Pipeline(
columns={
"returns": Returns(window_length=MOMENTUM_WINDOW),
},
screen=AverageDollarVolume(window_length=30) > 10e6
)
return pipeline
def before_trading_start(context: algo.Context, data: algo.BarData):
"""
Called every day before market open.
"""
factors = algo.pipeline_output('pipeline')
returns = factors["returns"].sort_values(ascending=False)
context.winners = returns.index[:3]
def rebalance(context: algo.Context, data: algo.BarData):
"""
Execute orders according to our schedule_function() timing.
"""
current_prices = data.current(context.winners, "price")
prior_closes = data.history(context.winners, "close", 2, "1d").iloc[0]
intraday_returns = (current_prices - prior_closes) / prior_closes
positions = context.portfolio.positions
for asset, position in positions.items():
if asset not in context.winners:
algo.order_target_value(asset, 0, style=MarketOrder())
for asset in context.winners:
if asset in positions:
continue
if intraday_returns[asset] > 0:
continue
algo.order_target_percent(asset, 1/6, style=MarketOrder())