From f89c7a7a8f7d69bcc19fd5b6f415ea049466b444 Mon Sep 17 00:00:00 2001 From: James Hilliard Date: Sat, 20 Jun 2020 19:17:12 -0600 Subject: [PATCH] Refactor binlog to use AsynchronousFileChannel API --- .../src/main/java/org/jpos/binlog/BinLog.java | 266 ++++++++++++------ .../java/org/jpos/binlog/BinLogReader.java | 44 ++- .../java/org/jpos/binlog/BinLogWriter.java | 95 +++++-- .../java/org/jpos/q2/cli/binlog/CUTOVER.java | 4 +- .../java/org/jpos/q2/cli/binlog/MONITOR.java | 4 +- .../test/java/org/jpos/binlog/BinLogTest.java | 17 +- 6 files changed, 310 insertions(+), 120 deletions(-) diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java index 955840e6f5..f8718afd1d 100644 --- a/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLog.java @@ -20,13 +20,22 @@ import org.jpos.iso.ISOUtil; -import java.io.*; +import java.io.EOFException; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileLock; +import java.nio.file.*; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; -import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardOpenOption.*; /** * General purpose binary log @@ -58,7 +67,7 @@ */ public abstract class BinLog implements AutoCloseable { private static final int FILE_MAGIC = 0xFC; - private static final int VERSION = 0x0001; + private static final short VERSION = 0x0001; private static final int RESERVED_LEN = 232; private static final int MAX_CREATE_ATTEMPTS = 100; protected static final int STATUS_OFFSET = Integer.BYTES + Short.BYTES; @@ -69,12 +78,12 @@ public abstract class BinLog implements AutoCloseable { private static final long CREATE_DELAY = 100L; protected static final long FIRST_EVENT_OFFSET = TAIL_OFFSET + RESERVED_LEN + Long.BYTES; private static SecureRandom numberGenerator = new SecureRandom(); - private static Pattern filePattern = Pattern.compile("^[\\d]{8}.dat$"); - private String mode; + private static String filePattern = "^[\\d]{8}.dat$"; + private OpenOption[] mode; private static Map mutexs = Collections.synchronizedMap(new HashMap<>()); - protected File dir; + protected Path dir; protected int fileNumber; - protected RandomAccessFile raf; + protected AsynchronousFileChannel raf; protected final Object mutex; /** @@ -84,15 +93,19 @@ public abstract class BinLog implements AutoCloseable { * @param create create directory if not exists * @throws IOException on error */ - protected BinLog(File dir, boolean create) throws IOException { - mutexs.putIfAbsent(dir.getAbsolutePath(), new Object()); - mutex = mutexs.get(dir.getAbsolutePath()); - if ((dir.exists() && !dir.isDirectory())|| (!dir.exists() && !create)) + protected BinLog(Path dir, boolean create) throws IOException { + mutexs.putIfAbsent(dir.toAbsolutePath().toString(), new Object()); + mutex = mutexs.get(dir.toAbsolutePath().toString()); + if ((Files.exists(dir) && !Files.isDirectory(dir))|| (!Files.exists(dir) && !create)) throw new IOException ("Invalid directory '" + dir.toString() + "'"); else - dir.mkdirs(); + Files.createDirectories(dir); this.dir = dir; - mode = create ? "rw" : "r"; + if (create) { + mode = new OpenOption[]{READ, WRITE, CREATE}; + } else { + mode = new OpenOption[]{READ}; + } } /** @@ -102,17 +115,17 @@ protected BinLog(File dir, boolean create) throws IOException { * @return a RandomAccessFile * @throws IOException on error */ - protected RandomAccessFile openOrCreateFile(File dir, int fileNumber) throws IOException { - File file = new File (dir, toFileName(fileNumber)); - for (int i=0; !file.exists() && i pos) throw new IOException ("Invalid tailoffset " + fileNumber + "/" + pos + "/" + currentTailOffset); - raf.seek(TAIL_OFFSET); - raf.writeLong(pos); + ByteBuffer tail = ByteBuffer.allocate(8); + tail.putLong(pos); + tail.flip(); + try { + int write = raf.write(tail, TAIL_OFFSET).get(); + if (write != 8) { + throw new IOException ("Failed to write 8 byte TAIL_OFFSET, return: " + write); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } } } @@ -237,34 +303,45 @@ protected int getFileNumber(String s) { return s != null ? Integer.parseInt(s.substring(0,8)) : 0; } - protected String getLastClosed (File dir) throws IOException { + protected String getLastClosed (Path dir) throws IOException { for (String s : getFilesReversed(dir)) { - if (isClosed(new File(dir, s))) + if (isClosed(dir.resolve(s))) return s; } return null; } - protected String getFirst (File dir) { - return Arrays.stream(dir.list()) - .filter(filePattern.asPredicate()) + protected String getFirst (Path dir) throws IOException { + return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false) + .map(Objects::toString) .sorted(String::compareTo) .findFirst() .orElse(null); } - private void verifyHeader(File file, RandomAccessFile raf) throws IOException { + private void verifyHeader(Path file, AsynchronousFileChannel raf) throws IOException { synchronized(mutex) { - if (raf.length() < TAIL_OFFSET + Long.BYTES) - throw new IOException ("Invalid jPOS BinLog file " + fileNumber + " (" + file.toString() + ": " + raf.length() + "/" + TAIL_OFFSET + Long.BYTES + ")"); - raf.seek(0); - int magic = raf.readInt(); + if (raf.size() < TAIL_OFFSET + Long.BYTES) + throw new IOException ("Invalid jPOS BinLog file " + fileNumber + " (" + file.toString() + ": " + raf.size() + "/" + TAIL_OFFSET + Long.BYTES + ")"); + ByteBuffer magicbuf = ByteBuffer.allocate(4); + try { + int read = raf.read(magicbuf, 0).get(); + if (read != 4) { + throw new IOException ("Failed to read 4 byte FILE_MAGIC, return: " + read); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + magicbuf.flip(); + int magic = magicbuf.getInt(); if (!(FILE_MAGIC == magic)) throw new IOException ("Invalid jPOS BinLog version " + fileNumber); long pos = readTailOffset(raf); if (pos < TAIL_OFFSET + Long.BYTES) throw new IOException ("Invalid jPOS BinLog header " + fileNumber + "/" + file.toString()); - long rafLength = raf.length(); + long rafLength = raf.size(); if (pos > rafLength) throw new IOException ("Truncated jPOS BinLog file " + fileNumber + " (" + pos + "/" + rafLength + ")"); } @@ -272,64 +349,89 @@ private void verifyHeader(File file, RandomAccessFile raf) throws IOException { private void lock (long timeout) throws IOException, InterruptedException { long end = System.currentTimeMillis() + timeout; - FileLock lock = raf.getChannel().lock(); + FileLock lock = raf.tryLock(); while (System.currentTimeMillis() < end) { Thread.sleep (10); } lock.release(); } - private List getFiles(File dir) { - return Arrays.stream(dir.list()) - .filter(filePattern.asPredicate()) + private List getFiles(Path dir) throws IOException { + return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false) + .map(Objects::toString) .sorted(String::compareTo) .collect(Collectors.toList()); } - private List getFilesReversed(File dir) { - return Arrays.stream(dir.list()) - .filter(filePattern.asPredicate()) + private List getFilesReversed(Path dir) throws IOException { + return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false) + .map(Objects::toString) .sorted((s1, s2) -> -s1.compareTo(s2)) .collect(Collectors.toList()); } - private boolean isClosed (File f) throws IOException { - if (f.exists()) { - try (RandomAccessFile raf = new RandomAccessFile(f, "r")) { - raf.seek(STATUS_OFFSET); - return Status.valueOf(raf.readShort()) == Status.CLOSED; + private boolean isClosed (Path f) throws IOException { + if (Files.exists(f)) { + try (AsynchronousFileChannel raf = AsynchronousFileChannel.open(f, READ)) { + ByteBuffer status = ByteBuffer.allocate(2); + try { + int read = raf.read(status, STATUS_OFFSET).get(); + if (read != 2) { + throw new IOException ("Failed to read 2 byte STATUS_OFFSET, return: " + read); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + status.flip(); + return Status.valueOf(status.getShort()) == Status.CLOSED; } } return false; } - private void writeHeader (RandomAccessFile r, int i) throws IOException { - r.seek(0); - r.writeInt (FILE_MAGIC); - r.writeShort (VERSION); - r.writeShort (Status.OPEN.intValue()); - r.writeLong(FIRST_EVENT_OFFSET); - r.writeInt(i); // this Log Number - r.writeInt(0); // next Log Number - r.write (new byte[RESERVED_LEN]); + private void writeHeader (AsynchronousFileChannel r, int i) throws IOException { + ByteBuffer header = ByteBuffer.allocate(256); + header.putInt(FILE_MAGIC); + header.putShort(VERSION); + header.putShort(Status.OPEN.val); + header.putLong(FIRST_EVENT_OFFSET); + header.putInt(i); // this Log Number + header.putInt(0); // next Log Number + header.put(new byte[RESERVED_LEN]); + header.flip(); + try { + int write = r.write(header, 0).get(); + if (write != 256) { + throw new IOException ("Failed to write 256 byte HEADER, return: " + write); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } } public enum Status { - OPEN(0), - CLOSED(1); + OPEN((short)0), + CLOSED((short)1); - private int val; + private short val; private static Map map = new HashMap<>(); static { for (Status s : Status.values()) { map.put (s.intValue(), s); } } - Status (int val) { + Status (short val) { this.val = val; } public int intValue() { return val; } + public short shortValue() { + return val; + } public static Status valueOf (int i) { return map.get(i); } diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java index ef287b2b47..be8d023c5c 100644 --- a/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLogReader.java @@ -18,10 +18,13 @@ package org.jpos.binlog; -import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; /** * Used to iterate over a binlog @@ -39,7 +42,7 @@ public class BinLogReader extends BinLog implements Iterator { * @param dir this Binlog's directory * @throws IOException on error */ - public BinLogReader (File dir) throws IOException { + public BinLogReader (Path dir) throws IOException { super (dir, false); int first = getFileNumber(getFirst(dir)); this.iteratorPos = FIRST_EVENT_OFFSET; @@ -55,7 +58,7 @@ public BinLogReader (File dir) throws IOException { * @param ref reference to a given entry * @throws IOException on error */ - public BinLogReader (File dir, BinLog.Ref ref) throws IOException { + public BinLogReader (Path dir, BinLog.Ref ref) throws IOException { super (dir, false); this.iteratorPos = ref.getOffset(); raf = open (dir, fileNumber = ref.getFileNumber()); @@ -70,7 +73,7 @@ public BinLogReader (File dir, BinLog.Ref ref) throws IOException { * @throws IOException on error */ public BinLogReader(String dir) throws IOException { - this (new File(dir)); + this (Paths.get(dir)); } /** @@ -83,7 +86,7 @@ public BinLogReader(String dir) throws IOException { * @throws IOException on error */ public BinLogReader (String dir, BinLog.Ref ref) throws IOException { - this (new File(dir), ref); + this (Paths.get(dir), ref); } /** @@ -157,7 +160,7 @@ public BinLog.Entry next() { long size = 0L; try { actualTailOffset = readTailOffset(raf); - size = raf.getChannel().size(); + size = raf.size(); } catch (IOException ignored) { } throw new NoSuchElementException( @@ -170,10 +173,33 @@ public BinLog.Entry next() { private byte[] read (long pos) throws IOException { int len = 0; try { - raf.seek(pos); - len = raf.readInt(); + ByteBuffer lenbuf = ByteBuffer.allocate(4); + try { + int read = raf.read(lenbuf, pos).get(); + if (read != 4) { + throw new IOException ("Failed to read 4 byte data length, return: " + read); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + lenbuf.flip(); + len = lenbuf.getInt(); byte[] buf = new byte[len]; - raf.read(buf); + ByteBuffer bytebuf = ByteBuffer.allocate(len); + try { + int read = raf.read(bytebuf, pos + 4).get(); + if (read != len) { + throw new IOException ("Failed to read " + len + " byte data, return: " + read); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + bytebuf.flip(); + bytebuf.get(buf, 0, len); return buf; } catch (IOException e) { throw new IOException ("Error reading position " + pos + " length " + len, e); diff --git a/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java b/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java index 18c3eb4b08..574ea371e9 100644 --- a/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java +++ b/modules/binlog/src/main/java/org/jpos/binlog/BinLogWriter.java @@ -18,11 +18,14 @@ package org.jpos.binlog; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileLock; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * Used to append add records to a BinLog @@ -36,7 +39,7 @@ public class BinLogWriter extends BinLog { * @param dir pointer to this Binlog's directory * @throws IOException on error */ - public BinLogWriter(File dir) throws IOException { + public BinLogWriter(Path dir) throws IOException { super (dir, true); int last = getFileNumber(getLastClosed(dir)); if (last == 0) { @@ -56,7 +59,7 @@ public BinLogWriter(File dir) throws IOException { * @throws IOException on error */ public BinLogWriter(String dir) throws IOException { - this (new File(dir)); + this (Paths.get(dir)); } /** @@ -68,16 +71,40 @@ public BinLogWriter(String dir) throws IOException { public BinLog.Ref add(byte[] record) throws IOException { synchronized(mutex) { checkCutover(true); - FileChannel channel = raf.getChannel(); - try (FileLock lock = channel.lock()) { + AsynchronousFileChannel channel = raf; + Future lockfut = channel.lock(); + FileLock lock = null; + try { + lock = lockfut.get(); + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + if (lock.isValid()) { long pos = readTailOffset(raf); - raf.seek(pos); - raf.writeInt(record.length); - raf.write(record); + int length = 4 + record.length; + ByteBuffer tail = ByteBuffer.allocate(length); + tail.putInt(record.length); + tail.put(record); + tail.flip(); + try { + int write = raf.write(tail, pos).get(); + if (write != length) { + throw new IOException ("Failed to write " + length + " byte record, return: " + write); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } channel.force(true); writeTailOffset(pos + Integer.BYTES + record.length); channel.force(false); + lock.release(); return new BinLog.Ref(fileNumber, pos); + } else { + throw new IOException ("Failed to acquire file lock"); } } } @@ -89,18 +116,52 @@ public BinLog.Ref add(byte[] record) throws IOException { public void cutover () throws IOException { synchronized(mutex) { checkCutover(true); - FileChannel channel = raf.getChannel(); - try (FileLock lock = channel.lock()) { + AsynchronousFileChannel channel = raf; + Future lockfut = channel.lock(); + FileLock lock = null; + try { + lock = lockfut.get(); + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } + if (lock.isValid()) { if (readStatus() != Status.OPEN) throw new IOException ("BinLog not open"); - RandomAccessFile newRaf = openOrCreateFile(dir, ++fileNumber); - raf.seek(NEXT_LOG_INDEX_OFFSET); - raf.writeInt(fileNumber); + AsynchronousFileChannel newRaf = openOrCreateFile(dir, ++fileNumber); + ByteBuffer index = ByteBuffer.allocate(4); + index.putInt(fileNumber); + index.flip(); + try { + int write = raf.write(index, NEXT_LOG_INDEX_OFFSET).get(); + if (write != 4) { + throw new IOException ("Failed to write 4 byte NEXT_LOG_INDEX_OFFSET, return: " + write); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } channel.force(false); - raf.seek(STATUS_OFFSET); - raf.writeShort(Status.CLOSED.intValue()); + ByteBuffer status = ByteBuffer.allocate(2); + status.putShort(Status.CLOSED.shortValue()); + status.flip(); + try { + int write = raf.write(status, STATUS_OFFSET).get(); + if (write != 2) { + throw new IOException ("Failed to write 2 byte STATUS_OFFSET, return: " + write); + } + } catch (InterruptedException e) { + throw new IOException (e.getMessage()); + } catch (ExecutionException e) { + throw new IOException (e.getMessage()); + } channel.force(false); raf = newRaf; + lock.release(); + } else { + throw new IOException ("Failed to acquire file lock"); } } } diff --git a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java index 99609521c2..dd2e6c9a79 100644 --- a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java +++ b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/CUTOVER.java @@ -21,7 +21,7 @@ import org.jpos.binlog.BinLogWriter; import org.jpos.q2.CLICommand; import org.jpos.q2.CLIContext; -import java.io.File; +import java.nio.file.Paths; @SuppressWarnings("unused") public class CUTOVER implements CLICommand { @@ -30,7 +30,7 @@ public class CUTOVER implements CLICommand { public void exec(CLIContext cli, String[] args) throws Exception { boolean quiet = args.length > 1 && "-q".equals (args[1]); - try (BinLogWriter bl = new BinLogWriter(new File((String) cli.getUserData().get("binlog")))) { + try (BinLogWriter bl = new BinLogWriter(Paths.get((String) cli.getUserData().get("binlog")))) { int oldFile = bl.getFileNumber(); bl.cutover(); if (!quiet) diff --git a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java index 02c7ab6117..a65ac8be54 100644 --- a/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java +++ b/modules/binlog/src/main/java/org/jpos/q2/cli/binlog/MONITOR.java @@ -23,7 +23,7 @@ import org.jpos.iso.ISOUtil; import org.jpos.q2.CLICommand; import org.jpos.q2.CLIContext; -import java.io.File; +import java.nio.file.Paths; @SuppressWarnings("unused") public class MONITOR implements CLICommand { @@ -31,7 +31,7 @@ public class MONITOR implements CLICommand { boolean ansi; public void exec(CLIContext cli, String[] args) throws Exception { - try (BinLogReader bl = new BinLogReader(new File((String) cli.getUserData().get("binlog")))) { + try (BinLogReader bl = new BinLogReader(Paths.get((String) cli.getUserData().get("binlog")))) { while (bl.hasNext(10000L)) { BinLog.Entry ref = bl.next(); cli.println(String.format("%06d@%08d %s", diff --git a/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java b/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java index b9c41bf416..5f811f6fe2 100644 --- a/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java +++ b/modules/binlog/src/test/java/org/jpos/binlog/BinLogTest.java @@ -30,8 +30,9 @@ import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import org.junit.jupiter.api.Order; -import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.concurrent.atomic.AtomicLong; import static org.apache.commons.lang3.JavaVersion.JAVA_9; @@ -41,7 +42,7 @@ @TestMethodOrder(OrderAnnotation.class) public class BinLogTest implements Runnable { - public static File dir; + public static Path dir; private AtomicLong cnt = new AtomicLong(); @BeforeEach @@ -56,8 +57,8 @@ public void before () { @BeforeAll public static void setup () throws IOException { - dir = File.createTempFile("binlog-", ""); - dir.delete(); + dir = Files.createTempFile("binlog-", ""); + Files.delete(dir); System.out.println ("TEMP=" + dir); // dir = new File("/tmp/binlog"); } @@ -99,15 +100,15 @@ public void run() { @AfterAll public static void cleanup() throws IOException { - if (dir.listFiles() != null) { - for (File f : dir.listFiles()) { + if (Files.list(dir) != null) { + for (Path f : Files.newDirectoryStream(dir)) { if (f.toString().endsWith(".dat")) { System.out.println ("Deleting " + f.toString()); - f.delete(); + Files.delete(f); } } } System.out.println ("Deleting " + dir); - dir.delete(); + Files.delete(dir); } }