|
2 | 2 | # -*- coding: utf-8 -*-
|
3 | 3 |
|
4 | 4 | import os
|
| 5 | +import pathlib |
5 | 6 | import shutil
|
6 | 7 | from io import BytesIO
|
7 | 8 |
|
@@ -75,6 +76,7 @@ def test_api_files_post_admin():
|
75 | 76 | )
|
76 | 77 | assert r.status_code == 200
|
77 | 78 | f = Files.query.filter_by(id=1).first()
|
| 79 | + assert f.sha1sum == "9032bbc224ed8b39183cb93b9a7447727ce67f9d" |
78 | 80 | os.remove(os.path.join(app.config["UPLOAD_FOLDER"] + "/" + f.location))
|
79 | 81 | destroy_ctfd(app)
|
80 | 82 |
|
@@ -137,3 +139,104 @@ def test_api_file_delete_admin():
|
137 | 139 | shutil.rmtree(os.path.dirname(path), ignore_errors=True)
|
138 | 140 |
|
139 | 141 | destroy_ctfd(app)
|
| 142 | + |
| 143 | + |
| 144 | +def test_api_file_custom_location(): |
| 145 | + """ |
| 146 | + Test file uploading with custom location |
| 147 | + """ |
| 148 | + app = create_ctfd() |
| 149 | + with app.app_context(): |
| 150 | + with login_as_user(app, name="admin") as client: |
| 151 | + with client.session_transaction() as sess: |
| 152 | + nonce = sess.get("nonce") |
| 153 | + r = client.post( |
| 154 | + "/api/v1/files", |
| 155 | + content_type="multipart/form-data", |
| 156 | + data={ |
| 157 | + "file": (BytesIO(b"test file content"), "test.txt"), |
| 158 | + "location": "testing/asdf.txt", |
| 159 | + "nonce": nonce, |
| 160 | + }, |
| 161 | + ) |
| 162 | + assert r.status_code == 200 |
| 163 | + f = Files.query.filter_by(id=1).first() |
| 164 | + assert f.sha1sum == "9032bbc224ed8b39183cb93b9a7447727ce67f9d" |
| 165 | + assert f.location == "testing/asdf.txt" |
| 166 | + r = client.get("/files/" + f.location) |
| 167 | + assert r.get_data(as_text=True) == "test file content" |
| 168 | + |
| 169 | + r = client.get("/api/v1/files/1") |
| 170 | + response = r.get_json() |
| 171 | + assert ( |
| 172 | + response["data"]["sha1sum"] |
| 173 | + == "9032bbc224ed8b39183cb93b9a7447727ce67f9d" |
| 174 | + ) |
| 175 | + assert response["data"]["location"] == "testing/asdf.txt" |
| 176 | + |
| 177 | + # Test deletion |
| 178 | + r = client.delete("/api/v1/files/1", json="") |
| 179 | + assert r.status_code == 200 |
| 180 | + assert Files.query.count() == 0 |
| 181 | + |
| 182 | + target = pathlib.Path(app.config["UPLOAD_FOLDER"]) / f.location |
| 183 | + assert target.exists() is False |
| 184 | + |
| 185 | + # Test invalid locations |
| 186 | + invalid_paths = [ |
| 187 | + "testing/prefix/asdf.txt", |
| 188 | + "/testing/asdf.txt", |
| 189 | + "asdf.txt", |
| 190 | + ] |
| 191 | + for path in invalid_paths: |
| 192 | + r = client.post( |
| 193 | + "/api/v1/files", |
| 194 | + content_type="multipart/form-data", |
| 195 | + data={ |
| 196 | + "file": (BytesIO(b"test file content"), "test.txt"), |
| 197 | + "location": path, |
| 198 | + "nonce": nonce, |
| 199 | + }, |
| 200 | + ) |
| 201 | + assert r.status_code == 400 |
| 202 | + destroy_ctfd(app) |
| 203 | + |
| 204 | + |
| 205 | +def test_api_file_overwrite_by_location(): |
| 206 | + """ |
| 207 | + Test file overwriting with a specific location |
| 208 | + """ |
| 209 | + app = create_ctfd() |
| 210 | + with app.app_context(): |
| 211 | + with login_as_user(app, name="admin") as client: |
| 212 | + with client.session_transaction() as sess: |
| 213 | + nonce = sess.get("nonce") |
| 214 | + r = client.post( |
| 215 | + "/api/v1/files", |
| 216 | + content_type="multipart/form-data", |
| 217 | + data={ |
| 218 | + "file": (BytesIO(b"test file content"), "test.txt"), |
| 219 | + "location": "testing/asdf.txt", |
| 220 | + "nonce": nonce, |
| 221 | + }, |
| 222 | + ) |
| 223 | + assert r.status_code == 200 |
| 224 | + f = Files.query.filter_by(id=1).first() |
| 225 | + r = client.get("/files/" + f.location) |
| 226 | + assert r.get_data(as_text=True) == "test file content" |
| 227 | + |
| 228 | + r = client.post( |
| 229 | + "/api/v1/files", |
| 230 | + content_type="multipart/form-data", |
| 231 | + data={ |
| 232 | + "file": (BytesIO(b"testing new uploaded file content"), "test.txt"), |
| 233 | + "location": "testing/asdf.txt", |
| 234 | + "nonce": nonce, |
| 235 | + }, |
| 236 | + ) |
| 237 | + assert r.status_code == 200 |
| 238 | + f = Files.query.filter_by(id=1).first() |
| 239 | + r = client.get("/files/" + f.location) |
| 240 | + assert f.sha1sum == "0ee7eb85ac0b8d8ae03f3080589157cde553b13f" |
| 241 | + assert r.get_data(as_text=True) == "testing new uploaded file content" |
| 242 | + destroy_ctfd(app) |
0 commit comments