Docs
天勤策略运行脚本详解教学

天勤策略运行脚本详解教学

天勤策略运行脚本详解教学

教学目标: 深入理解天勤量化策略的运行机制和事件驱动模型


📋 目录


1. 运行脚本概述

1.1 三个版本对比

脚本文件主要区别适用场景
1天勤策略运行.pybacktest_statue=False实盘交易(真实账户)
1天勤策略运行2.pybacktest_statue=False, kq_account=True快期模拟账户
1天勤策略运行kq3.pybacktest_statue=False, kq_account=True快期账户(优化版)

1.2 脚本架构

┌─────────────────────────────────────┐
│         环境配置与初始化             │
│  - 加载环境变量                     │
│  - 导入策略和网关                   │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│         连接天勤网关                │
│  - 创建TianqinGateway实例           │
│  - 连接期货公司                     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│         策略初始化                  │
│  - 创建策略实例                     │
│  - 初始化K线数据                    │
│  - 获取持仓信息                     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│      事件驱动主循环                 │
│  - 监听K线更新 → 执行策略           │
│  - 监听价格变化 → 更新Tick          │
│  - 监听持仓变化 → 更新仓位          │
└─────────────────────────────────────┘

2. 导入模块详解

2.1 系统与工具库

import os                    # 操作系统接口,用于读取环境变量
import time                  # 时间模块,用于延时控制
from dotenv import load_dotenv  # 加载.env环境变量文件
import pandas as pd          # 数据处理库

使用场景:

# 从.env文件读取敏感配置
load_dotenv()
tq_account = os.environ.get("tq_account")
tq_password = os.environ.get("tq_password")

2.2 任务调度器

from apscheduler.schedulers.background import BackgroundScheduler

功能: 定时执行任务(当前代码注释掉了,可用于定期执行策略)

使用示例:

scheduler = BackgroundScheduler(timezone="MST", job_defaults={"max_instances": 20})

# 每5分钟执行一次数据刷新
scheduler.add_job(
    refresh_data,
    "cron",
    args=[exchange, strategy, time_frame],
    minute="*/5"
)
scheduler.start()

2.3 StudyQuant框架

from studyquant import *

导入的核心类和枚举:

  • Exchange: 交易所枚举(CZCE, INE, CFFEX等)
  • Interval: 时间周期枚举(MINUTE, MIN5, HOUR等)
  • OrderType: 订单类型(MARKET, LIMIT)
  • SymbolType: 标的类型(FUTURES, SPOT)
  • RunMode: 运行模式(BACKTESTING, LIVE)
  • Offset: 开平仓方向(OPEN, CLOSE等)

2.4 天勤网关

from tianqin_quant.api.tianqin_gateway import (
    TianqinGateway,        # 天勤网关主类
    to_tq_symbol,          # 符号转换函数
    create_vt_symbol,      # 创建标准化符号
    SUPPORTED_BROKERS,     # 支持的期货公司列表
)

2.5 策略导入

from double_ma_strategy import DoubleMaStrategy     # 双均线策略
from studyquant.app.log.logger import log_engine   # 日志引擎

3. 核心函数分析

3.1 数据刷新函数

def refresh_data(exchange: any, strategy: DoubleMaStrategy, time_frame):
    """
    更新数据并推送给策略
    
    Args:
        exchange: 交易所网关对象
        strategy: 策略实例
        time_frame: K线周期
    """
    # 步骤1: 撤销所有未成交订单
    exchange.CancelAll()
    
    # 步骤2: 获取最新Tick行情
    ticker = exchange.GetTicker()
    strategy.on_tick_data(ticker)
    
    # 步骤3: 获取账户信息
    account = exchange.GetAccount()
    strategy.on_account(account)
    
    # 步骤4: 获取K线数据
    kline_df = exchange.GetKline(time_frame)
    strategy.on_kline(kline_df)

执行流程图:

[撤单] → [获取Tick] → [获取账户] → [获取K线] → [推送给策略]

3.2 订单撤销函数

def cancell_pending_orders(exchange: any):
    """撤销未成交的挂单"""
    for order_data in exchange.order_list:
        symbol, order = order_data
        
        # 跳过已成交订单
        if order.status == "FINISHED":
            continue
        
        # 检查订单是否存活
        if order.is_dead != True:
            # 打印订单状态
            print(f"{symbol}单状态: {order.status}, "
                  f"已成交: {order.volume_orign - order.volume_left} 手")
            
            # 计算订单存续时间
            kline_datetime = exchange.kline.iloc[-1]["datetime"]
            insert_date_time = order["insert_date_time"]
            time_difference = kline_datetime - insert_date_time
            
            # 撤销订单
            exchange.api.cancel_order(order)
            print(f"{symbol}单已撤单, {order}")

关键逻辑:

  1. 订单状态检查: 只处理未完成的订单
  2. 时间计算: 可以根据时间差决定是否撤单
  3. 撤单操作: 调用API撤销订单

4. 主程序流程

4.1 环境配置

# 1. 加载环境变量
load_dotenv()

# 2. 读取天勤账号信息
tq_account = os.environ.get("tq_account")
tq_password = os.environ.get("tq_password")

# 3. 配置期货公司连接
default_setting = {
    "期货公司": SUPPORTED_BROKERS,  # 支持的期货公司
    "用户名": tq_account,
    "密码": tq_password,
    "经纪商代码": "",              # 可选
    "交易服务器": "",              # 可选
}

4.2 策略参数设置

from strategy_setting import sc_setting

# sc_setting 包含:
sc_setting = {
    "symbol": "sc2505",                # 原油期货合约
    "exchange": Exchange.INE,          # 上期能源
    "strategy_name": "SCStrategy",     # 策略名称
    "time_frame": Interval.MIN5,       # 5分钟K线
    "order_amount": 1,                 # 下单数量
    "order_type": OrderType.MARKET,    # 市价单
    "mode": RunMode.BACKTESTING,       # 回测模式
    # ... 更多参数
}

4.3 网关连接

# 创建网关实例
exchange = TianqinGateway(
    sc_setting["symbol"], 
    exchange=sc_setting["exchange"]
)

# 连接方式选择(三选一):

# 方式1: 实盘交易
exchange.connect_tianqin(default_setting, backtest_statue=False)

# 方式2: 回测模式
exchange.connect_tianqin(default_setting, backtest_statue=True, kq_account=False)

# 方式3: 快期模拟账户
exchange.connect_tianqin(default_setting, backtest_statue=False, kq_account=True)

4.4 策略初始化

# 1. 创建策略实例
strategy = DoubleMaStrategy(
    exchange,                          # 网关对象
    sc_setting["strategy_name"],       # 策略名称
    exchange.vt_symbol,                # 标准化符号
    sc_setting                         # 策略配置
)

# 2. 启用实盘交易
strategy.trading = True

# 3. 设置下单数量
strategy.fixed_order_amount = sc_setting["order_amount"]

# 4. 初始化数据
ticker = exchange.get_ticker(exchange.tq_symbol)
kline = exchange.get_kline(
    exchange.tq_symbol,
    interval=sc_setting["time_frame"],
    min_length=strategy.long_period + 10  # 获取足够长度的K线
)
strategy.init_kline(kline)

# 5. 获取并处理持仓
position = exchange.get_position()
strategy.process_tq_position(position)

5. 事件驱动机制

5.1 主循环结构

while True:
    # 等待行情更新(阻塞)
    exchange.api.wait_update()
    
    # 事件1: K线更新
    if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
        处理K线更新...
    
    # 事件2: 价格变化
    if exchange.api.is_changing(exchange.quote, "last_price"):
        处理Tick更新...
    
    # 事件3: 持仓变化
    if exchange.api.is_changing(exchange.position, "pos"):
        处理持仓更新...

5.2 K线更新事件

if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
    # 获取最新K线时间
    kline_time = pd.to_datetime(exchange.kline.iloc[-1]["datetime"])
    close_price = kline.iloc[-1].close
    print(f"K线更新: {kline_time}, 收盘价: {close_price}")
    
    # 撤销挂单
    cancell_pending_orders(exchange)
    
    # 执行策略逻辑
    strategy.on_future_kline()

触发条件: K线的datetime字段发生变化(新K线生成)

执行动作:

  1. 打印K线更新信息
  2. 撤销所有挂单
  3. 调用策略的K线处理函数

5.3 价格变化事件

if exchange.api.is_changing(exchange.quote, "last_price"):
    # 获取最新报价
    ticker = exchange.get_ticker(exchange.tq_symbol)
    
    # 推送给策略
    strategy.on_future_tick(ticker)

触发条件: 最新价(last_price)发生变化

执行动作:

  1. 获取最新Tick数据
  2. 更新策略的Tick信息

5.4 持仓变化事件

if exchange.api.is_changing(exchange.position, "pos"):
    print("净持仓发生变化", exchange.position)
    
    # 获取最新持仓
    position = exchange.get_position()
    
    # 更新策略持仓
    strategy.process_tq_position(position)

触发条件: 净持仓(pos)发生变化

执行动作:

  1. 打印持仓变化信息
  2. 获取最新持仓数据
  3. 更新策略的持仓状态

6. 实战案例

6.1 完整运行示例

场景: 使用快期模拟账户运行原油期货策略

# 步骤1: 配置环境变量 (.env文件)
"""
tq_account=your_account
tq_password=your_password
"""

# 步骤2: 设置策略参数
sc_setting = {
    "symbol": "sc2505",
    "exchange": Exchange.INE,
    "strategy_name": "原油策略",
    "time_frame": Interval.MIN5,
    "order_amount": 1,
    "mode": RunMode.BACKTESTING,
}

# 步骤3: 连接快期账户
exchange = TianqinGateway(sc_setting["symbol"], exchange=sc_setting["exchange"])
exchange.connect_tianqin(default_setting, backtest_statue=False, kq_account=True)

# 步骤4: 初始化策略
strategy = DoubleMaStrategy(exchange, "原油策略", exchange.vt_symbol, sc_setting)
strategy.trading = True

# 步骤5: 初始化数据
kline = exchange.get_kline(exchange.tq_symbol, interval=Interval.MIN5, min_length=30)
strategy.init_kline(kline)

# 步骤6: 运行主循环
while True:
    exchange.api.wait_update()
    
    if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
        cancell_pending_orders(exchange)
        strategy.on_future_kline()
    
    if exchange.api.is_changing(exchange.quote, "last_price"):
        ticker = exchange.get_ticker(exchange.tq_symbol)
        strategy.on_future_tick(ticker)
    
    if exchange.api.is_changing(exchange.position, "pos"):
        position = exchange.get_position()
        strategy.process_tq_position(position)

6.2 调试技巧

打印关键信息

# 在K线更新时打印详细信息
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
    kline_data = exchange.kline.iloc[-1]
    print(f"""
    ========== K线更新 ==========
    时间: {pd.to_datetime(kline_data['datetime'])}
    开盘: {kline_data['open']}
    最高: {kline_data['high']}
    最低: {kline_data['low']}
    收盘: {kline_data['close']}
    成交量: {kline_data['volume']}
    持仓: {strategy.pos}
    快线: {strategy.fast_ma0}
    慢线: {strategy.slow_ma0}
    ============================
    """)

记录订单信息

# 在下单前记录详细信息
def debug_order(strategy, signal, price):
    print(f"""
    ========== 下单信息 ==========
    信号: {signal}
    当前持仓: {strategy.pos}
    下单价格: {price}
    下单数量: {strategy.fixed_order_amount}
    快线: {strategy.fast_ma0:.2f}
    慢线: {strategy.slow_ma0:.2f}
    ============================
    """)

6.3 常见问题排查

问题1: K线不更新

# 检查是否正确订阅行情
print("K线长度:", len(exchange.kline))
print("最后一根K线:", exchange.kline.iloc[-1])

# 确认事件监听
if exchange.api.is_changing(exchange.kline.iloc[-1], ["datetime"]):
    print("K线更新事件触发")
else:
    print("等待K线更新...")

问题2: 订单不成交

# 检查订单状态
for symbol, order in exchange.order_list:
    print(f"""
    订单状态: {order.status}
    委托价格: {order.price}
    委托数量: {order.volume_orign}
    成交数量: {order.volume_orign - order.volume_left}
    市场价格: {exchange.quote.last_price}
    """)

问题3: 持仓异常

# 定期打印持仓信息
print(f"""
当前持仓: {strategy.pos}
多头持仓: {strategy.tq_position.pos_long if strategy.tq_position else 0}
空头持仓: {strategy.tq_position.pos_short if strategy.tq_position else 0}
""")

📝 总结

核心要点

  1. 事件驱动: 通过wait_update()is_changing()实现高效的事件监听
  2. 数据同步: K线、Tick、持仓三大数据源实时同步
  3. 订单管理: 自动撤单机制避免订单堆积
  4. 灵活配置: 支持多种运行模式和账户类型

学习路径

  1. ✅ 理解事件驱动机制
  2. ✅ 掌握数据获取和处理
  3. ✅ 熟悉订单管理流程
  4. ✅ 实践调试和优化

下一步

  • 深入学习策略逻辑实现
  • 了解风控和仓位管理
  • 掌握参数优化方法

作者: Rudy
微信: studyquant88
更多教程: https://studyquant.com