import zipline.api as algo
from zipline.finance.execution import MarketOrder
from zipline.finance import slippage, commission
from quantrocket.realtime import collect_market_data
from codeload.sell_gap.pipeline import make_pipeline
def initialize(context: algo.Context):
"""
Called once at the start of a backtest, and once per day at
the start of live trading.
"""
algo.attach_pipeline(make_pipeline(), 'pipeline')
algo.set_benchmark(algo.symbol("SPY"))
algo.schedule_function(
find_down_gaps,
algo.date_rules.every_day(),
algo.time_rules.market_open(minutes=1),
)
algo.schedule_function(
short_down_gaps,
algo.date_rules.every_day(),
algo.time_rules.market_open(minutes=10),
)
algo.schedule_function(
close_positions,
algo.date_rules.every_day(),
algo.time_rules.market_close(minutes=5),
)
algo.set_commission(
commission.PerShare(cost=0.0))
algo.set_slippage(
slippage.FixedBasisPointsSlippage(
basis_points=3.0))
def before_trading_start(context: algo.Context, data: algo.BarData):
"""
Called every day before market open. Gathers today's pipeline
output and initiates real-time data collection (in live trading).
"""
context.candidates = algo.pipeline_output('pipeline')
context.assets_to_short = []
context.target_value_per_position = -50e3
if algo.get_environment("arena") == "trade":
sids = [asset.real_sid for asset in context.candidates.index]
if sids:
collect_market_data(
"us-stk-realtime",
sids=sids,
until="09:32:00 America/New_York")
algo.set_realtime_db(
"us-stk-realtime-1min",
fields={
"close": "LastPriceClose",
"open": "LastPriceOpen",
"high": "LastPriceHigh",
"low": "LastPriceLow",
"volume": "LastSizeSum"})
def find_down_gaps(context: algo.Context, data: algo.BarData):
"""
Identify stocks that gapped down below their moving average.
"""
if len(context.candidates) == 0:
return
today_opens = data.current(context.candidates.index, 'open')
prior_lows = context.candidates["prior_low"]
stds = context.candidates["std"]
gapped_down = today_opens < (prior_lows - stds)
are_below_mavg = (today_opens < context.candidates["mavg"])
assets_to_short = context.candidates[
gapped_down
& are_below_mavg
]
assets_to_short = assets_to_short.sort_values(
"std", ascending=False).iloc[:10].index
context.assets_to_short = assets_to_short
def short_down_gaps(context: algo.Context, data: algo.BarData):
"""
Short the stocks that gapped down.
"""
for asset in context.assets_to_short:
algo.order_value(
asset,
context.target_value_per_position,
style=MarketOrder()
)
def close_positions(context: algo.Context, data: algo.BarData):
"""
Closes all positions.
"""
for asset, position in context.portfolio.positions.items():
algo.order(
asset,
-position.amount,
style=MarketOrder()
)