区块链研究实验室|如何使用Python轻松获取Binance历史交易


作为加密货币交易所的前首席技术官,我经常与一些主要交易所的API交往。在本文中,我将指导您完成创建可靠的Python脚本以从Binance提取历史交易数据的过程。
理论基础
在回测交易策略时,即使用过去的数据执行我们的策略并分析收益和其他重要因素时,我们必须确保我们拥有合适的数据类型。鉴于某些策略需要水平的书本数据,而其他策略可能只需要花费一个小时的时间,该过程并不总是那么简单,而基础架构,可用性和连接性等元素可能会因数据类型的不同而大不相同。
所需数据主要由交易策略的频率定义。交易策略类别是我将在Algotrading系列文章中讨论的其他主题的主题,但是您可以在Investopedia中找到一些可靠的信息。
好的,但是为什么本文仅涉及获取“交易”数据,为什么我们要使用Binance API?
数据频率和Binance
我想说交易数据端点主要在99.99%的交易所中提供。它非常细致,提供了足够的详细信息(在某些非常特殊的情况下)用于回测高频交易(HFT)策略,并且可以用作OHLC蜡烛的构建基块(1S至24H,或者更多,如果您需要的话)。
交易数据是通用的,并且允许使用不同频率的策略进行大量实验。
为什么选择币安?那只是因为它是我因交易量而倾向于回溯的交易所之一。我绝不隶属于Binance。我们将在我的Algotrading系列中查看其他交流。
我们将要编码什么?
我们将创建一个Python脚本,该脚本接收对符号,开始日期和结束日期作为命令行参数。它将包含所有交易的CSV文件输出到磁盘。该过程可以通过以下步骤进行详细说明:

1. 分析 symbol, starting_date, 和ending_date参数

2. 获取在开始日期发生的第一笔交易,以获取第一笔trade_id。

3. 循环获取每个请求1,000笔交易(Binance API限制),直到达到ending_date。

4. 最后将数据保存到磁盘。对于示例,我们将其保存为CSV,但是您还有其他选择。

我们将使用pandas,requests,time,sys和datetime库。在代码段中,将不会显示错误验证,因为它不会为说明添加任何值。当然,您可以在GitHub上查看完整代码。https://github.com/tgcandido/binance-data-fetcher
编码时间
解析参数:

symbol:交易对的符号,由Binance定义。可以在这里查询,也可以从Binance Web应用程序的URL复制,但不包括_ character.

starting_date 和ending_date:预期的格式为mm / dd / yyyy,或者在Python lang语中为%m /%d /%Y。
要获取参数,我们将使用内置的sys(这里没有什么花哨的东西),并且为了解析日期,我们将使用datetime库。

symbol=sys.argv[1]starting_date=datetime.strptime(sys.argv[2],'%m/%d/%Y')ending_date=datetime.strptime(sys.argv[3],'%m/%d/%Y')+timedelta(days=1)-timedelta(microseconds=1)


我们将增加一天并减去一微秒,以便ending_date时间部分始终为23:59:59.999,这使得获取同一天的间隔更加实用。
提取交易
使用Binance的API并使用aggTrades端点,一次请求最多可以获取1,000条交易,如果使用开始和结束参数,则它们之间的间隔最多为一小时。在发生一些失败之后,通过使用时间间隔进行获取(在某个时间点或另一个时间点,流动性将变得疯狂,我将失去一些宝贵的交易),我决定尝试from_id策略。
选择aggTrades端点是因为它返回压缩的交易。这样,我们就不会丢失任何宝贵的信息。
进行压缩、聚合交易。在同一时间、同一订单、同一价格进行交易的数量将累计。
from_id策略是这样的:通过将日期间隔发送到端点,我们将获得starting_date的第一笔交易。之后我们将从第一个获取的交易ID开始获取1,000个交易。然后我们将检查最后一笔交易是否在我们的end_date之后发生。如果是这样,我们已经遍历了所有时间段,可以将结果保存到文件中。否则,我们将更新from_id变量以获取最后的交易ID,然后重新开始循环。
取得第一个交易编号

new_ending_date=from_date+timedelta(seconds=60)r=requests.get('https://api.binance.com/api/v3/aggTrades',params={"symbol":symbol,"startTime":get_unix_ms_from_date(from_date),"endTime":get_unix_ms_from_date(new_ending_date)})response=r.json()iflen(response)>0:returnresponse[0]['a']else:raiseException('notradesfound')


首先我们创建一个new_end_date。那是因为我们通过传递startTime和endTime参数来使用aggTrades。现在我们只需要知道该期间的第一个交易ID,因此我们将增加60秒。在流动性较低的货币对中,可以更改此参数,因为不能保证在请求的第一天发生交易。
然后使用我们的helper函数解析日期,并通过使用calendar.timegm函数将其转换为Unix毫秒表示形式。首选timegm函数,因为它将日期保留为UTC。

defget_unix_ms_from_date(date):returnint(calendar.timegm(date.timetuple())*1000+date.microsecond/1000)


请求的响应是按日期排序的交易对象列表,格式如下:

[{"a":26129,//AggregatetradeId"p":"0.01633102",//Price"q":"4.70443515",//Quantity"f":27781,//FirsttradeId"l":27781,//LasttradeId"T":1498793709153,//Timestamp"m":true,//Wasthebuyerthemaker?"M":true//Wasthetradethebestpricematch?}]
因此,当我们需要第一个交易ID时,我们将返回response[0][“a”]值。
主循环

现在我们有了第一个交易ID,我们可以一次获取1,000笔交易,直到到达ending_date。以下代码将在我们的主循环中调用。它将使用from_id参数执行我们的请求,放弃startDate和endDate参数。
defget_trades(symbol,from_id):r=requests.get("https://api.binance.com/api/v3/aggTrades",params={"symbol":symbol,"limit":1000,"fromId":from_id})returnr.json()
现在这是我们的主循环,它将执行请求并创建我们的DataFrame。

from_id=get_first_trade_id_from_start_date(symbol,from_date)current_time=0df=pd.DataFrame()whilecurrent_time<get_unix_ms_from_date(to_date):trades=get_trades(symbol,from_id)from_id=trades[-1]['a']current_time=trades[-1]['T']print(f'fetched{len(trades)}tradesfromid{from_id}@{datetime.utcfromtimestamp(current_time/1000.0)}')df=pd.concat([df,pd.DataFrame(trades)])#dontexceedrequestlimitstime.sleep(0.5)
我们检查包含最近提取的交易日期的current_time是否大于我们的to_date,如果是,我们:

使用from_id参数获取交易

使用从最新交易中获取的信息来更新from_id和current_time参数

打印一个nice的调试消息

pd.concat与我们DataFrame中的先前交易取得的交易

leep,使Binance不会给我们一个难看的429 HTTP响应

清除与保存
组装完DataFrame之后,我们需要执行简单的数据清理。我们将删除重复项并trim在to_date之后发生的交易(我们遇到了这个问题,因为我们要获取1000笔交易中的大部分,因此预计我们会在目标结束日期之后执行一些交易)。
我们可以封装trim函数:

deftrim(df,to_date):returndf[df['T']<=get_unix_ms_from_date(to_date)]

并执行数据清理:

df.drop_duplicates(subset='a',inplace=True)df=trim(df,to_date)


现在我们可以使用to_csv方法将其保存到文件中:

filename=f'binance__{symbol}__trades__from__{sys.argv[2].replace("/","_")}__to__{sys.argv[3].replace("/","_")}.csv'df.to_csv(filename)


我们还可以使用其他数据存储机制,例如Arctic。
奖励:验证你的数据
在处理交易策略时,我们要相信我们的数据,这一点很重要。我们可以很容易地用提取到的交易数据进行验证:

df=pd.read_csv(file_name)values=df.to_numpy()last_id=values[0][1]forrowinvalues[1:]:trade_id=row[1]iflast_id+1!=trade_id:print('last_id',last_id)print('trade_id',trade_id)print('inconsistentdata')exit()last_id=trade_idprint('dataisOK!')

在代码片段中,我们将DataFrame转换为NumPy数组,并逐行迭代,检查交易ID是否每行递增1。
Binance交易ID是递增编号的,并且是为每个符号创建的,所以很容易验证数据是否正确。
创建成功的交易策略的第一步是拥有正确的数据。
———————————————
原文作者:
Thiago Candido
译者:链三丰
译文出处:http://bitoken.world
———————————————

作者:链三丰,来源:区块链研究实验室

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

留言与评论(共有 0 条评论)
   
验证码:
微信号已复制,请打开微信添加咨询详情!