Skip to content

Commit

Permalink
[ZEPPELIN-5744] Allow NoteManager for concurrent operation (apache#4374)
Browse files Browse the repository at this point in the history
  • Loading branch information
saLeox authored and akoira committed Feb 1, 2024
1 parent 2f456fa commit d5e5b74
Showing 1 changed file with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,118 @@ public void run(int index) throws IOException {
super.notes.remove(index);
}
}

@Test
public void testConcurrentOperation() throws Exception {
int threshold = 10, noteNum = 150;
Map<Integer, String> notes = new ConcurrentHashMap<>();
ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
// Save note concurrently
ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, notes, "/prod/note%s");
saveNote.exec();
// Move note concurrently
ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, notes, "/dev/project_%s/my_note%s");
moveNote.exec();
// Move folder concurrently
ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, noteNum, notes, "/staging/note_%s/my_note%s");
moveFolder.exec();
// Remove note concurrently
ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, noteNum, notes, null);
removeNote.exec();
threadPool.shutdown();
}

abstract class ConcurrentTask {
private ExecutorService threadPool;
private int noteNum;
private Map<Integer, String> notes;
private String pathPattern;

public ConcurrentTask(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
this.threadPool = threadPool;
this.noteNum = noteNum;
this.notes = notes;
this.pathPattern = pathPattern;
}

public abstract void run(int index) throws IOException;

public void exec() throws Exception {
// Simulate concurrent operation
CountDownLatch latch = new CountDownLatch(noteNum);
for (int i = 0; i < noteNum; i++) {
int index = i;
threadPool.execute(() -> {
try {
this.run(index);
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
});
}
// wait till all tasks are completed with 5 seconds as timeout threshold
assertTrue(latch.await(5, TimeUnit.SECONDS));
this.checkPathByPattern();
}

private void checkPathByPattern() throws IOException {
assertEquals(this.notes.size(), noteManager.getNotesInfo().size());
if (notes.isEmpty()) return;
for (Integer key : this.notes.keySet()) {
String expectPath = String.format(this.pathPattern, key, key);
assertEquals(expectPath, noteManager.processNote(notes.get(key), n -> n).getPath());
}
}
}

class ConcurrentTaskSaveNote extends ConcurrentTask {
public ConcurrentTaskSaveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
super(threadPool, noteNum, notes, pathPattern);
}

@Override
public void run(int index) throws IOException {
String tarPath = String.format(super.pathPattern, index, index);
Note note = createNote(tarPath);
noteManager.saveNote(note);
super.notes.put(index, note.getId());
}
}

class ConcurrentTaskMoveNote extends ConcurrentTask {
public ConcurrentTaskMoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
super(threadPool, noteNum, notes, pathPattern);
}

@Override
public void run(int index) throws IOException {
String tarPath = String.format(super.pathPattern, index, index);
noteManager.moveNote(super.notes.get(index), tarPath, AuthenticationInfo.ANONYMOUS);
}
}

class ConcurrentTaskMoveFolder extends ConcurrentTask {
public ConcurrentTaskMoveFolder(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
super(threadPool, noteNum, notes, pathPattern);
}

@Override
public void run(int index) throws IOException {
String curPath = "/dev/project_" + index, tarPath = "/staging/note_" + index;
noteManager.moveFolder(curPath, tarPath, AuthenticationInfo.ANONYMOUS);
}
}

class ConcurrentTaskRemoveNote extends ConcurrentTask {
public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
super(threadPool, noteNum, notes, pathPattern);
}

@Override
public void run(int index) throws IOException {
noteManager.removeNote(super.notes.get(index), AuthenticationInfo.ANONYMOUS);
super.notes.remove(index);
}
}
}

0 comments on commit d5e5b74

Please # to comment.