-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpinata_connection.py
206 lines (180 loc) · 7.14 KB
/
pinata_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from streamlit.connections import ExperimentalBaseConnection
import os
import requests
import streamlit as st
import json
import py7zr
class PinataConnection(ExperimentalBaseConnection[requests.Session]):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._resource = self._connect(**kwargs)
def _connect(self, **kwargs) -> requests.Session:
session = requests.Session()
if 'jwt' in kwargs:
jwt = kwargs.pop('jwt')
elif 'jwt' in self._secrets:
jwt = self._secrets['jwt']
else:
jwt = None
if jwt is not None:
session.headers.update({
'Authorization': f'Bearer {jwt}',
})
else:
if 'api_key' in kwargs and 'secret_api_key' in kwargs:
api_key = kwargs.pop('api_key')
secret_api_key = kwargs.pop('secret_api_key')
elif 'api_key' in self._secrets and 'secret_api_key' in self._secrets:
api_key = self._secrets['api_key']
secret_api_key = self._secrets['secret_api_key']
else:
api_key = None
secret_api_key = None
if api_key is not None and secret_api_key is not None:
session.headers.update({
'pinata_api_key': str(api_key),
'pinata_secret_api_key': str(secret_api_key),
})
else:
raise ValueError("Missing authentication details. Please provide either a JWT token or API keys.")
return session
def cursor(self):
return self._resource
def upload_file(self, file, custom_key_values=None):
with self._resource as s:
pinata_metadata = {
"name": file.name,
}
if custom_key_values is not None:
pinata_metadata["keyvalues"] = custom_key_values
response = s.post(
'https://api.pinata.cloud/pinning/pinFileToIPFS',
files={'file': file.read()},
data={
'pinataOptions': '{"cidVersion": 1}',
'pinataMetadata': json.dumps(pinata_metadata)
},
)
return response.json()
def upload_directory(self, directory_path, custom_key_values=None):
# Create a 7z file from the directory
with py7zr.SevenZipFile("temp.7z", 'w') as archive:
archive.writeall(directory_path, 'base')
with self._resource as s:
with open("temp.7z", "rb") as f:
pinata_metadata = {
"name": os.path.basename(directory_path),
}
if custom_key_values is not None:
pinata_metadata["keyvalues"] = custom_key_values
response = s.post(
'https://api.pinata.cloud/pinning/pinFileToIPFS',
files={'file': f.read()},
data={
'pinataOptions': '{"cidVersion": 1}',
'pinataMetadata': json.dumps(pinata_metadata)
},
)
# Remove the temporary 7z file
os.remove("temp.7z")
return response.json()
def pin_by_cid(self, cid, name="cid_content", custom_key_values=None):
payload = {
"hashToPin": cid,
"pinataMetadata": {
"name": name,
"keyvalues": custom_key_values if custom_key_values else {}
}
}
with self._resource as s:
response = s.post(
'https://api.pinata.cloud/pinning/pinByHash',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.json()
def pin_by_json(self, content, name="json_content", custom_key_values=None):
payload = {
"pinataOptions": {
"cidVersion": 1
},
"pinataMetadata": {
"name": name,
"keyvalues": custom_key_values if custom_key_values else {}
},
"pinataContent": content
}
with self._resource as s:
response = s.post(
'https://api.pinata.cloud/pinning/pinJSONToIPFS',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.json()
def update_metadata(self, ipfs_pin_hash, name=None, keyvalues=None):
payload = {
"ipfsPinHash": ipfs_pin_hash,
}
if name is not None:
payload["name"] = name
if keyvalues is not None:
payload["keyvalues"] = keyvalues
with self._resource as s:
response = s.put(
'https://api.pinata.cloud/pinning/hashMetadata',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
return {"success": True}
else:
return {"success": False, "status_code": response.status_code}
def query(self, status=None, hash_contains=None, page_limit=1000, ttl: int = 3600):
@st.cache_data(ttl=ttl)
def _get_pins(_status, _hash_contains, _page_limit):
params = {
'pageLimit': _page_limit,
}
if _status is not None:
params['status'] = _status
if _hash_contains is not None:
params['hashContains'] = _hash_contains
with self._resource as s:
response = s.get(
'https://api.pinata.cloud/data/pinList',
params=params
)
return response.json()
return _get_pins(status, hash_contains, page_limit)
def list_by_jobs(self, status=None, ipfs_pin_hash=None, limit=1000, offset=0, sort='ASC', ttl: int = 3600):
@st.cache_data(ttl=ttl)
def _list_by_jobs(_status, _ipfs_pin_hash, _limit, _offset, _sort):
params = {
'sort': _sort,
'limit': _limit,
'offset': _offset
}
if _status is not None:
params['status'] = _status
if _ipfs_pin_hash is not None:
params['ipfs_pin_hash'] = _ipfs_pin_hash
with self._resource as s:
response = s.get(
'https://api.pinata.cloud/pinning/pinJobs',
params=params
)
return response.json()
return _list_by_jobs(status, ipfs_pin_hash, limit, offset, sort)
def unpin(self, ipfs_hash):
with self._resource as s:
response = s.delete(
f'https://api.pinata.cloud/pinning/unpin/{ipfs_hash}'
)
if response.status_code == 200:
return {"success": True}
else:
return {"success": False, "status_code": response.status_code}
def data_usage(self):
with self._resource as s:
response = s.get('https://api.pinata.cloud/data/userPinnedDataTotal')
return response.json()