import sfgrid_constants import xtquant.xtconstant as xtconstant from xtquant import xtdata, xttrader from xtquant.xttype import StockAccount, XtOrder, XtPosition import datetime def is_trading_time(): """ 判断当前时间是否在周一至周五的9:30~11:30或13:00~15:00时间段内 返回: True(在交易时间内) 或 False(不在交易时间内) """ # 获取当前时间 now = datetime.datetime.now() # 获取星期几 (0=周一, 1=周二, ..., 4=周五) weekday = now.weekday() # 判断是否为周一到周五 (0-4) if weekday < 5: current_time = now.time() # 第一个时间段: 9:30~11:30 morning_start = datetime.time(9, 30) morning_end = datetime.time(11, 30) # 第二个时间段: 13:00~15:00 afternoon_start = datetime.time(13, 0) afternoon_end = datetime.time(15, 0) # 判断是否在第一个时间段内 if morning_start <= current_time <= morning_end: return True # 判断是否在第二个时间段内 if afternoon_start <= current_time <= afternoon_end: return True return False def getInstrumentName(stock_code): # print(f"getInstrumentName: 获取标的名称 {stock_code}") detail = xtdata.get_instrument_detail(stock_code, False) if detail is None: return "UnNamed" return detail['InstrumentName'] def getStockPosition(stock_code: str, xt_trader: xttrader.XtQuantTrader, account: StockAccount): volume = 0 positions = xt_trader.query_stock_positions(account) if positions: for temp in positions: pos:XtPosition = temp if pos.stock_code == stock_code: volume = pos.volume break return volume def minPosition(gridIndex:int): return sfgrid_constants.grid_volume * gridIndex def queryPendingOrder(stock_code:str, tag: str, xt_trader: xttrader.XtQuantTrader, account: StockAccount) -> list[XtOrder]: if stock_code == None or tag == None: return [] orders = xt_trader.query_stock_orders(account) result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag] return result