Cloudreve 是什么?

引用自 https://docs.cloudreve.org/

Cloudreve 可以让您快速搭建起公私兼备的网盘系统。Cloudreve 在底层支持不同的云存储平台,用户在实际使用时无须关心物理存储方式。你可以使用 Cloudreve 搭建个人用网盘、文件分享系统,亦或是针对大小团体的公有云系统。

Cloudreve 的离线下载核心由 Aria2 驱动。正确配置并启用离线下载功能后,用户可以创建磁力链、HTTP、种子下载任务,由服务端下载完成后加入到用户文件中。

准备工作

本篇文章使用 Python ≥ 3.8
Aria2 的安装与配置此处不再讨论,请自行寻找相关教程。
配置离线下载请查看 Cloudreve 官方文档。

下面的内容默认上述条件已配置完成

创造需求

假如某A科游戏,在官网提供直链安装包,但是下载太慢(几kb那种)。可以利用 离线下载 自动下载至 vps,后续再通过 vps 下载到手机/Pad上,以此获得比较理想的速度。

先决条件

如何得知什么时候该下载?一般取得游戏版本号或哈希就可以解决。
幸运的是该游戏官网有一个带有版本号和下载链接的 api。

1
2
3
4
5
6
7
{
"success": true,
"value": {
"url": "https://arcaea-static.lowiro-cdn.net/wdNmVfhaxKt9lQUZ9%2FMKFrV9nEs2SQeVTVs0O%2B48hyrF%2FckYd3BNJsIghHTELo5qQ0143aXcIeLQpWNKVjFgtBCcjzv9r3DW8BUYr7GwRvYTUcxi5IyzqxZb3Rxb9F%2B9EvguuumWMUcSZXg%3D?filename=arcaea_4.2.1c.apk",
"version": "4.2.1c"
}
}

Cloudreve Api
https://github.com/cloudreve/Cloudreve/blob/master/routers/router.go

实现需求

整理需要用到的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
url = "https://webapi.lowiro.com/webapi/serve/static/bin/arcaea/apk"

saveDirectory = "/Arcaea" #保存路径,需要提前建好

cloudreveUrl = "http://127.0.0.1:1145" #cloudreve访问链接
session = cloudreveUrl + "/api/v3/user/session"
directory = cloudreveUrl + "/api/v3/directory" + saveDirectory.replace("/", "%2F")
objectSet = cloudreveUrl + "/api/v3/object"
aria2Download = cloudreveUrl + "/api/v3/aria2/url"

header = {
"accept": 'application/json, text/plain, */*',
"cookie": cookie
}

实现接口调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 登录,获取cookie(不要开启登录验证码)
def Login(user, password):
data = json.dumps({"userName": user,"Password": password,"captchaCode": ""})
response = requests.post(session, data, allow_redirects=False)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
cookies = response.cookies.items()
cookie = ''
for name, value in cookies:
cookie += '{0}={1};'.format(name, value)
return cookie
return None

# 获取文件列表
def GetFileList():
response = requests.get(directory, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
return resJson
return None

#删除文件
def DeleteApk(id):
data = json.dumps({"items": [id], "dirs": []})
response = requests.delete(objectSet, data=data, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
return "Success"
return resJson["msg"]
return response.text

#添加离线任务
def AddCloudreveAria2(url):
data = json.dumps({"url": [url],"dst": saveDirectory})
response = requests.post(aria2Download, data, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
return "Success"
return resJson["msg"]
return response.text

实现最终需求

最后就是根据自己的需求编写代码

这里直接贴上我写好的代码,需要系统定时调用
(仅自用,不能保证可靠性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import requests
import json
import os

url = "https://webapi.lowiro.com/webapi/serve/static/bin/arcaea/apk"

saveDirectory = "/Arcaea"

cloudreveUrl = "http://127.0.0.1:1145"
session = cloudreveUrl + "/api/v3/user/session"
directory = cloudreveUrl + "/api/v3/directory" + saveDirectory.replace("/", "%2F")
objectSet = cloudreveUrl + "/api/v3/object"
aria2Download = cloudreveUrl + "/api/v3/aria2/url"

user = ""
password = ""

filename = os.getcwd() + "/ArcaeaVer.txt"
cookieFile = os.getcwd() + "/key"

def Main():
cookie = ReadCookie()
global header
header = {
"accept": 'application/json, text/plain, */*',
"cookie": cookie
}
response = requests.get(url)
resJson = json.loads(response.text)
if resJson["success"]:
ver = resJson["value"]["version"]
verStatus = Compare(ver)
if not verStatus:
fileStatus = GetFileList(oldVer)
if fileStatus[1]:
DeleteStatus = DeleteApk(fileStatus[0])
if DeleteStatus == "Success":
Ver.Write(ver)
print("已删除旧版本")
else:
print("旧版本删除失败 msg: {}".format(DeleteStatus))
else:
if fileStatus[0] == 0:
Ver.Write(ver)
print("未发现旧版本,无需删除")
elif fileStatus[0] == 401:
print("尝试登录")
status = GetCookie(user, password)
if status:
print("登录成功")
Main()
else:
print("登录失败")
return
downloadStatus = AddCloudreveAria2(resJson["value"]["url"])
if downloadStatus == "Success":
print("{} 创建离线任务成功".format(ver))
else:
print("失败 msg: {}".format(downloadStatus))
else:
print("{} 已是最新版本".format(ver))
else:
print("获取失败,请稍后再试")
print(response.text)

class Ver:
@staticmethod
def Read():
if not os.path.exists(filename):
file = open(filename, "w+")
else:
file = open(filename, "r")
ver = file.read()
file.close()
return ver
@staticmethod
def Write(newVer):
file = open(filename, "w+")
file.write(newVer)
file.close()

# 获取cookie
def GetCookie(user, password):
cookie = Login(user, password)
if not cookie == None:
file = open(cookieFile, "w+")
file.write(cookie)
return True
return False

def ReadCookie():
if os.path.exists(cookieFile):
file = open(cookieFile, "r")
return file.read()
return None

# 比较版本
def Compare(newVer):
global oldVer
oldVer = Ver.Read()
if oldVer == newVer:
return True
return False

# 登录
def Login(user, password):
data = json.dumps({"userName":user,"Password":password,"captchaCode":""})
response = requests.post(session, data, allow_redirects=False)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
cookies = response.cookies.items()
cookie = ''
for name, value in cookies:
cookie += '{0}={1};'.format(name, value)
return cookie
return None

# 获取文件列表
def GetFileList(ver):
response = requests.get(directory, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
objects = resJson["data"]["objects"]
for ob in objects:
if ob["name"] == "arcaea_{}.apk".format(ver):
return [ob["id"], True]
return [resJson["code"], False]
return [response.text, False]

#删除文件
def DeleteApk(id):
data = json.dumps({"items": [id], "dirs": []})
response = requests.delete(objectSet, data=data, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
return "Success"
return resJson["msg"]
return response.text

#添加离线任务
def AddCloudreveAria2(url):
data = json.dumps({"url": [url],"dst": saveDirectory})
response = requests.post(aria2Download, data, headers=header)
if response.status_code == 200:
resJson = json.loads(response.text)
if resJson["code"] == 0:
return "Success"
return resJson["msg"]
return response.text

if __name__ == "__main__":
Main()