Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1923 from Saartank/object-prestagi…
Browse files Browse the repository at this point in the history
…ng-tool-jan21

Add Prestage Functionality to Client
  • Loading branch information
turetske authored Jan 28, 2025
2 parents afd33af + 5f572b3 commit 0d3fb1c
Show file tree
Hide file tree
Showing 16 changed files with 813 additions and 91 deletions.
11 changes: 11 additions & 0 deletions client/acquire_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ func (tg *tokenGenerator) SetToken(contents string) {
tg.Token.Store(&info)
}

// Copy the contents
func (tg *tokenGenerator) Copy() *tokenGenerator {
return &tokenGenerator{
DirResp: tg.DirResp,
Destination: tg.Destination,
IsWrite: tg.IsWrite,
EnableAcquire: tg.EnableAcquire,
Sync: new(singleflight.Group),
}
}

// Read a token from a file; ensure
func getTokenFromFile(tokenLocation string) (string, error) {
//Read in the JSON
Expand Down
83 changes: 83 additions & 0 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,3 +976,86 @@ func TestTokenGenerate(t *testing.T) {
assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes)
}
}

func TestPrestage(t *testing.T) {
server_utils.ResetTestState()
defer server_utils.ResetTestState()
fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)

te, err := client.NewTransferEngine(fed.Ctx)
require.NoError(t, err)

// Other set-up items:
// The cache will open the file to stat it, downloading the first block.
// Make sure we are greater than 64kb in size.
testFileContent := strings.Repeat("test file content", 10000)
// Create the temporary file to upload
tempFile, err := os.CreateTemp(t.TempDir(), "test")
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

tempToken, _ := getTempToken(t)
defer tempToken.Close()
defer os.Remove(tempToken.Name())
// Disable progress bars to not reuse the same mpb instance
viper.Set("Logging.DisableProgressBars", true)

oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix)
assert.NoError(t, err)
defer func() {
_, err := config.SetPreferredPrefix(oldPref)
require.NoError(t, err)
}()

// Set path for object to upload/download
for _, export := range fed.Exports {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Upload the file with COPY
transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name()))
assert.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes)

// Check the cache info twice, make sure it's not cached.
tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
innerFileUrl, err := url.Parse(uploadURL)
require.NoError(t, err)
age, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
// Due to an xrootd limitation, CacheInfo performs a GET request instead of a HEAD request.
// Once this limitation is resolved and CacheInfo is updated accordingly,
// the assertion should be changed to -1 instead of 0.
assert.Equal(t, 0, age)

age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
// Due to an xrootd limitation, CacheInfo performs a GET request instead of a HEAD request.
// Once this limitation is resolved and CacheInfo is updated accordingly,
// the assertion should be changed to -1 instead of 0.
assert.Equal(t, 0, age)

// Prestage the object
tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl)
require.NoError(t, err)
err = tc.Submit(tj)
require.NoError(t, err)
results, err := tc.Shutdown()
require.NoError(t, err)
assert.Equal(t, 1, len(results))

// Check if object is cached.
age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
require.NotEqual(t, -1, age)
}
}
Loading

0 comments on commit 0d3fb1c

Please # to comment.