Docs
天勤策略运行脚本详解教学
天勤策略运行脚本详解教学
天勤策略运行脚本详解教学
教学目标: 深入理解天勤量化策略的运行机制和事件驱动模型
📋 目录
1. 运行脚本概述
1.1 三个版本对比
脚本文件 | 主要区别 | 适用场景 |
---|---|---|
1天勤策略运行.py | backtest_statue=False | 实盘交易(真实账户) |
1天勤策略运行2.py | backtest_statue=False, kq_account=True | 快期模拟账户 |
1天勤策略运行kq3.py | backtest_statue=False, kq_account=True | 快期账户(优化版) |
1.2 脚本架构
┌─────────────────────────────────────┐
│ 环境配置与初始化 │
│ - 加载环境变量 │
│ - 导入策略和网关 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 连接天勤网关 │
│ - 创建TianqinGateway实例 │
│ - 连接期货公司 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 策略初始化 │
│ - 创建策略实例 │
│ - 初始化K线数据 │
│ - 获取持仓信息 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 事件驱动主循环 │
│ - 监听K线更新 → 执行策略 │
│ - 监听价格变化 → 更新Tick │
│ - 监听持仓变化 → 更新仓位 │
└─────────────────────────────────────┘
2. 导入模块详解
2.1 系统与工具库
import os # 操作系统接口,用于读取环境变量
import time # 时间模块,用于延时控制
from dotenv import load_dotenv # 加载.env环境变量文件
import pandas as pd # 数据处理库
使用场景:
# 从.env文件读取敏感配置
load_dotenv()
tq_account = os.environ.get("tq_account")
tq_password = os.environ.get("tq_password")
2.2 任务调度器
from apscheduler.schedulers.background import BackgroundScheduler
功能: 定时执行任务(当前代码注释掉了,可用于定期执行策略)
使用示例:
scheduler = BackgroundScheduler(timezone="MST", job_defaults={"max_instances": 20})
# 每5分钟执行一次数据刷新
scheduler.add_job(
refresh_data,
"cron",
args=[exchange, strategy, time_frame],
minute="*/5"
)
scheduler.start()
2.3 StudyQuant框架
from studyquant import *
导入的核心类和枚举:
Exchange
: 交易所枚举(CZCE, INE, CFFEX等)Interval
: 时间周期枚举(MINUTE, MIN5, HOUR等)OrderType
: 订单类型(MARKET, LIMIT)SymbolType
: 标的类型(FUTURES, SPOT)RunMode
: 运行模式(BACKTESTING, LIVE)Offset
: 开平仓方向(OPEN, CLOSE等)
2.4 天勤网关
from tianqin_quant.api.tianqin_gateway import (
TianqinGateway, # 天勤网关主类
to_tq_symbol, # 符号转换函数
create_vt_symbol, # 创建标准化符号
SUPPORTED_BROKERS, # 支持的期货公司列表
)
2.5 策略导入
from double_ma_strategy import DoubleMaStrategy # 双均线策略
from studyquant.app.log.logger import log_engine # 日志引擎
3. 核心函数分析
3.1 数据刷新函数
def refresh_data(exchange: any, strategy: DoubleMaStrategy, time_frame):
"""
更新数据并推送给策略
Args:
exchange: 交易所网关对象
strategy: 策略实例
time_frame: K线周期
"""
# 步骤1: 撤销所有未成交订单
exchange.CancelAll()
# 步骤2: 获取最新Tick行情
ticker = exchange.GetTicker()
strategy.on_tick_data(ticker)
# 步骤3: 获取账户信息
account = exchange.GetAccount()
strategy.on_account(account)
# 步骤4: 获取K线数据
kline_df = exchange.GetKline(time_frame)
strategy.on_kline(kline_df)
执行流程图:
[撤单] → [获取Tick] → [获取账户] → [获取K线] → [推送给策略]
3.2 订单撤销函数
def cancell_pending_orders(exchange: any):
"""撤销未成交的挂单"""
for order_data in exchange.order_list:
symbol, order = order_data
# 跳过已成交订单
if order.status == "FINISHED":
continue
# 检查订单是否存活
if order.is_dead != True:
# 打印订单状态
print(f"{symbol}单状态: {order.status}, "
f"已成交: {order.volume_orign - order.volume_left} 手")
# 计算订单存续时间
kline_datetime = exchange.kline.iloc[-1]["datetime"]
insert_date_time = order["insert_date_time"]
time_difference = kline_datetime - insert_date_time
# 撤销订单
exchange.api.cancel_order(order)
print(f"{symbol}单已撤单, {order}")
关键逻辑:
- 订单状态检查: 只处理未完成的订单
- 时间计算: 可以根据时间差决定是否撤单
- 撤单操作: 调用API撤销订单
4. 主程序流程
4.1 环境配置
# 1. 加载环境变量
load_dotenv()
# 2. 读取天勤账号信息
tq_account = os.environ.get("tq_account")
tq_password = os.environ.get("tq_password")
# 3. 配置期货公司连接
default_setting = {
"期货公司": SUPPORTED_BROKERS, # 支持的期货公司
"用户名": tq_account,
"密码": tq_password,
"经纪商代码": "", # 可选
"交易服务器": "", # 可选
}
4.2 策略参数设置
from strategy_setting import sc_setting
# sc_setting 包含:
sc_setting = {
"symbol": "sc2505", # 原油期货合约
"exchange": Exchange.INE, # 上期能源
"strategy_name": "SCStrategy", # 策略名称
"time_frame": Interval.MIN5, # 5分钟K线
"order_amount": 1, # 下单数量
"order_type": OrderType.MARKET, # 市价单
"mode": RunMode.BACKTESTING, # 回测模式
# ... 更多参数
}
4.3 网关连接
# 创建网关实例
exchange = TianqinGateway(
sc_setting["symbol"],
exchange=sc_setting["exchange"]
)
# 连接方式选择(三选一):
# 方式1: 实盘交易
exchange.connect_tianqin(default_setting, backtest_statue=False)
# 方式2: 回测模式
exchange.connect_tianqin(default_setting, backtest_statue=True, kq_account=False)
# 方式3: 快期模拟账户
exchange.connect_tianqin(default_setting, backtest_statue=False, kq_account=True)
4.4 策略初始化
# 1. 创建策略实例
strategy = DoubleMaStrategy(
exchange, # 网关对象
sc_setting["strategy_name"], # 策略名称
exchange.vt_symbol, # 标准化符号
sc_setting # 策略配置
)
# 2. 启用实盘交易
strategy.trading = True
# 3. 设置下单数量
strategy.fixed_order_amount = sc_setting["order_amount"]
# 4. 初始化数据
ticker = exchange.get_ticker(exchange.tq_symbol)
kline = exchange.get_kline(
exchange.tq_symbol,
interval=sc_setting["time_frame"],
min_length=strategy.long_period + 10 # 获取足够长度的K线
)
strategy.init_kline(kline)
# 5. 获取并处理持仓
position = exchange.get_position()
strategy.process_tq_position(position)
5. 事件驱动机制
5.1 主循环结构
while True:
# 等待行情更新(阻塞)
exchange.api.wait_update()
# 事件1: K线更新
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
处理K线更新...
# 事件2: 价格变化
if exchange.api.is_changing(exchange.quote, "last_price"):
处理Tick更新...
# 事件3: 持仓变化
if exchange.api.is_changing(exchange.position, "pos"):
处理持仓更新...
5.2 K线更新事件
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
# 获取最新K线时间
kline_time = pd.to_datetime(exchange.kline.iloc[-1]["datetime"])
close_price = kline.iloc[-1].close
print(f"K线更新: {kline_time}, 收盘价: {close_price}")
# 撤销挂单
cancell_pending_orders(exchange)
# 执行策略逻辑
strategy.on_future_kline()
触发条件: K线的datetime字段发生变化(新K线生成)
执行动作:
- 打印K线更新信息
- 撤销所有挂单
- 调用策略的K线处理函数
5.3 价格变化事件
if exchange.api.is_changing(exchange.quote, "last_price"):
# 获取最新报价
ticker = exchange.get_ticker(exchange.tq_symbol)
# 推送给策略
strategy.on_future_tick(ticker)
触发条件: 最新价(last_price)发生变化
执行动作:
- 获取最新Tick数据
- 更新策略的Tick信息
5.4 持仓变化事件
if exchange.api.is_changing(exchange.position, "pos"):
print("净持仓发生变化", exchange.position)
# 获取最新持仓
position = exchange.get_position()
# 更新策略持仓
strategy.process_tq_position(position)
触发条件: 净持仓(pos)发生变化
执行动作:
- 打印持仓变化信息
- 获取最新持仓数据
- 更新策略的持仓状态
6. 实战案例
6.1 完整运行示例
场景: 使用快期模拟账户运行原油期货策略
# 步骤1: 配置环境变量 (.env文件)
"""
tq_account=your_account
tq_password=your_password
"""
# 步骤2: 设置策略参数
sc_setting = {
"symbol": "sc2505",
"exchange": Exchange.INE,
"strategy_name": "原油策略",
"time_frame": Interval.MIN5,
"order_amount": 1,
"mode": RunMode.BACKTESTING,
}
# 步骤3: 连接快期账户
exchange = TianqinGateway(sc_setting["symbol"], exchange=sc_setting["exchange"])
exchange.connect_tianqin(default_setting, backtest_statue=False, kq_account=True)
# 步骤4: 初始化策略
strategy = DoubleMaStrategy(exchange, "原油策略", exchange.vt_symbol, sc_setting)
strategy.trading = True
# 步骤5: 初始化数据
kline = exchange.get_kline(exchange.tq_symbol, interval=Interval.MIN5, min_length=30)
strategy.init_kline(kline)
# 步骤6: 运行主循环
while True:
exchange.api.wait_update()
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
cancell_pending_orders(exchange)
strategy.on_future_kline()
if exchange.api.is_changing(exchange.quote, "last_price"):
ticker = exchange.get_ticker(exchange.tq_symbol)
strategy.on_future_tick(ticker)
if exchange.api.is_changing(exchange.position, "pos"):
position = exchange.get_position()
strategy.process_tq_position(position)
6.2 调试技巧
打印关键信息
# 在K线更新时打印详细信息
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
kline_data = exchange.kline.iloc[-1]
print(f"""
========== K线更新 ==========
时间: {pd.to_datetime(kline_data['datetime'])}
开盘: {kline_data['open']}
最高: {kline_data['high']}
最低: {kline_data['low']}
收盘: {kline_data['close']}
成交量: {kline_data['volume']}
持仓: {strategy.pos}
快线: {strategy.fast_ma0}
慢线: {strategy.slow_ma0}
============================
""")
记录订单信息
# 在下单前记录详细信息
def debug_order(strategy, signal, price):
print(f"""
========== 下单信息 ==========
信号: {signal}
当前持仓: {strategy.pos}
下单价格: {price}
下单数量: {strategy.fixed_order_amount}
快线: {strategy.fast_ma0:.2f}
慢线: {strategy.slow_ma0:.2f}
============================
""")
6.3 常见问题排查
问题1: K线不更新
# 检查是否正确订阅行情
print("K线长度:", len(exchange.kline))
print("最后一根K线:", exchange.kline.iloc[-1])
# 确认事件监听
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
print("K线更新事件触发")
else:
print("等待K线更新...")
问题2: 订单不成交
# 检查订单状态
for symbol, order in exchange.order_list:
print(f"""
订单状态: {order.status}
委托价格: {order.price}
委托数量: {order.volume_orign}
成交数量: {order.volume_orign - order.volume_left}
市场价格: {exchange.quote.last_price}
""")
问题3: 持仓异常
# 定期打印持仓信息
print(f"""
当前持仓: {strategy.pos}
多头持仓: {strategy.tq_position.pos_long if strategy.tq_position else 0}
空头持仓: {strategy.tq_position.pos_short if strategy.tq_position else 0}
""")
📝 总结
核心要点
- 事件驱动: 通过
wait_update()
和is_changing()
实现高效的事件监听 - 数据同步: K线、Tick、持仓三大数据源实时同步
- 订单管理: 自动撤单机制避免订单堆积
- 灵活配置: 支持多种运行模式和账户类型
学习路径
- ✅ 理解事件驱动机制
- ✅ 掌握数据获取和处理
- ✅ 熟悉订单管理流程
- ✅ 实践调试和优化
下一步
- 深入学习策略逻辑实现
- 了解风控和仓位管理
- 掌握参数优化方法
作者: Rudy
微信: studyquant88
更多教程: https://studyquant.com