diff --git a/src/main/java/org/wildfly/transaction/client/LocalTransaction.java b/src/main/java/org/wildfly/transaction/client/LocalTransaction.java index f308928..fbd7a1e 100644 --- a/src/main/java/org/wildfly/transaction/client/LocalTransaction.java +++ b/src/main/java/org/wildfly/transaction/client/LocalTransaction.java @@ -34,6 +34,7 @@ import org.wildfly.common.Assert; import org.wildfly.transaction.client._private.Log; +import org.wildfly.transaction.client.spi.LocalTransactionProvider; /** * A transaction from a local transaction provider. @@ -134,6 +135,10 @@ void unimportBacking() { } } + LocalTransactionProvider getProvider() { + return owner.getProvider(); + } + public void setRollbackOnly() throws IllegalStateException, SystemException { transaction.setRollbackOnly(); } diff --git a/src/main/java/org/wildfly/transaction/client/SimpleXid.java b/src/main/java/org/wildfly/transaction/client/SimpleXid.java index 2c3c105..ab78392 100644 --- a/src/main/java/org/wildfly/transaction/client/SimpleXid.java +++ b/src/main/java/org/wildfly/transaction/client/SimpleXid.java @@ -116,6 +116,31 @@ public int compareTo(final SimpleXid o) { return res; } + public String toHexString() { + StringBuilder b = new StringBuilder(); + toHexString(b); + return b.toString(); + } + + private void toHexString(StringBuilder builder) { + builder.append(Integer.toHexString(formatId)).append(':'); + for (final byte x : globalId) { + final int v = x & 0xff; + if (v < 16) { + builder.append('0'); + } + builder.append(Integer.toHexString(v)); + } + builder.append(':'); + for (final byte x : branchId) { + final int v = x & 0xff; + if (v < 16) { + builder.append('0'); + } + builder.append(Integer.toHexString(v)); + } + } + private static int compareByteArrays(byte[] a1, byte[] a2) { final int l1 = a1.length; final int l2 = a2.length; @@ -130,22 +155,8 @@ private static int compareByteArrays(byte[] a1, byte[] a2) { public String toString() { StringBuilder b = new StringBuilder(); - b.append("XID [").append(Integer.toHexString(formatId)).append(':'); - for (final byte x : globalId) { - final int v = x & 0xff; - if (v < 16) { - b.append('0'); - } - b.append(Integer.toHexString(v)); - } - b.append(':'); - for (final byte x : branchId) { - final int v = x & 0xff; - if (v < 16) { - b.append('0'); - } - b.append(Integer.toHexString(v)); - } + b.append("XID ["); + toHexString(b); b.append(']'); return b.toString(); } diff --git a/src/main/java/org/wildfly/transaction/client/SubordinateXAResource.java b/src/main/java/org/wildfly/transaction/client/SubordinateXAResource.java index 9ce4c7a..6b1efea 100644 --- a/src/main/java/org/wildfly/transaction/client/SubordinateXAResource.java +++ b/src/main/java/org/wildfly/transaction/client/SubordinateXAResource.java @@ -46,6 +46,7 @@ final class SubordinateXAResource implements XAResource, XARecoverable, Serializ private final URI location; private final String parentName; + private final XAResourceRegistry resourceRegistry; private volatile int timeout = LocalTransactionContext.DEFAULT_TXN_TIMEOUT; private long startTime = 0L; private volatile Xid xid; @@ -53,15 +54,22 @@ final class SubordinateXAResource implements XAResource, XARecoverable, Serializ private final AtomicInteger stateRef = new AtomicInteger(0); - SubordinateXAResource(final URI location, final String parentName) { + SubordinateXAResource(final URI location, final String parentName, XAResourceRegistry recoveryRegistry) { this.location = location; this.parentName = parentName; + this.resourceRegistry = recoveryRegistry; + } + + SubordinateXAResource(final URI location, final String parentName, final int flags, XAResourceRegistry recoveryRegistry) { + this(location, parentName, recoveryRegistry); + stateRef.set(flags); } SubordinateXAResource(final URI location, final int flags, final String parentName) { this.location = location; this.parentName = parentName; stateRef.set(flags); + this.resourceRegistry = null; } Xid getXid() { @@ -136,15 +144,41 @@ public void beforeCompletion(final Xid xid) throws XAException { } public int prepare(final Xid xid) throws XAException { - return commitToEnlistment() ? lookup(xid).prepare() : XA_RDONLY; + final int result; + try { + result = commitToEnlistment() ? lookup(xid).prepare() : XA_RDONLY; + } catch (XAException | RuntimeException exception) { + if (resourceRegistry != null) + resourceRegistry.resourceInDoubt(this); + throw exception; + } + if (resourceRegistry != null) + resourceRegistry.removeResource(this); + return result; } public void commit(final Xid xid, final boolean onePhase) throws XAException { - if (commitToEnlistment()) lookup(xid).commit(onePhase); + try { + if (commitToEnlistment()) lookup(xid).commit(onePhase); + } catch (XAException | RuntimeException exception) { + if (onePhase && resourceRegistry != null) + resourceRegistry.resourceInDoubt(this); + throw exception; + } + if (onePhase && resourceRegistry != null) + resourceRegistry.removeResource(this); } public void rollback(final Xid xid) throws XAException { - if (commitToEnlistment()) lookup(xid).rollback(); + try { + if (commitToEnlistment()) lookup(xid).rollback(); + } catch (XAException | RuntimeException e) { + if (resourceRegistry != null) + resourceRegistry.resourceInDoubt(this); + throw e; + } + if (resourceRegistry != null) + resourceRegistry.removeResource(this); } public void forget(final Xid xid) throws XAException { diff --git a/src/main/java/org/wildfly/transaction/client/XAOutflowedResources.java b/src/main/java/org/wildfly/transaction/client/XAOutflowedResources.java index b1f9dd6..0ea3a9c 100644 --- a/src/main/java/org/wildfly/transaction/client/XAOutflowedResources.java +++ b/src/main/java/org/wildfly/transaction/client/XAOutflowedResources.java @@ -55,7 +55,11 @@ SubordinateXAResource getOrEnlist(final URI location, final String parentName) t if (xaResource != null) { return xaResource; } - xaResource = new SubordinateXAResource(location, parentName); + final XAResourceRegistry resourceRegistry = transaction.getProvider().getXAResourceRegistry(transaction); + xaResource = new SubordinateXAResource(location, parentName, resourceRegistry); + if (resourceRegistry != null) { + resourceRegistry.addResource(xaResource, location); + } if (! transaction.enlistResource(xaResource)) { throw Log.log.couldNotEnlist(); } diff --git a/src/main/java/org/wildfly/transaction/client/XAResourceRegistry.java b/src/main/java/org/wildfly/transaction/client/XAResourceRegistry.java new file mode 100644 index 0000000..97e00d2 --- /dev/null +++ b/src/main/java/org/wildfly/transaction/client/XAResourceRegistry.java @@ -0,0 +1,75 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2018 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wildfly.transaction.client; + + +import javax.transaction.SystemException; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import java.net.URI; + +import static org.wildfly.transaction.client.OutflowHandleManager.FL_COMMITTED; +import static org.wildfly.transaction.client.OutflowHandleManager.FL_CONFIRMED; + +/** + * Registry that keeps track of outflowed resources info for a specific transaction. + * + * Used for recovery of those resources when they enter in doubt state. + * + * @author Flavia Rainone + */ +public abstract class XAResourceRegistry { + + /** + * Adds a XA resource to this registry. + * + * @param resource the resource + * @param uri the resource URI location + * @throws SystemException if there is a problem recording this resource + */ + protected abstract void addResource(XAResource resource, URI uri) throws SystemException; + + /** + * Removes a XA resource from this registry. + * + * @param resource the resource + * @throws XAException if there is a problem deleting this resource + */ + protected abstract void removeResource(XAResource resource) throws XAException; + + /** + * Flags the previously added resource as in doubt, meaning that it failed to complete prepare, rollback or commit. + * It can be invoked more than once for the same resource if that resource repeatedly fails to perform those + * operations, such as a resource that first fails to prepare, and then fails to rollback. + * + * @param resource the resource + */ + protected abstract void resourceInDoubt(XAResource resource); + + /** + * Reloads an in doubt resource, recreating a previously lost remote XA resource object. This method + * must be invoked to recreate in doubt resources after a server shutdown or crash. + * + * @param uri the URI where the outflowed resource is located + * @param nodeName the node name of the resource + * @return a newly-created resource representing a previously lost XA resource that is in doubt + */ + protected XAResource reloadInDoubtResource(URI uri, String nodeName) { + return new SubordinateXAResource(uri, nodeName, FL_COMMITTED | FL_CONFIRMED, this); + } +} diff --git a/src/main/java/org/wildfly/transaction/client/_private/Log.java b/src/main/java/org/wildfly/transaction/client/_private/Log.java index ab830b1..fcd2e89 100644 --- a/src/main/java/org/wildfly/transaction/client/_private/Log.java +++ b/src/main/java/org/wildfly/transaction/client/_private/Log.java @@ -18,8 +18,11 @@ package org.wildfly.transaction.client._private; +import java.io.File; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; import java.security.Permission; import java.util.ServiceConfigurationError; @@ -58,6 +61,13 @@ public interface Log extends BasicLogger { @Message(value = "Subordinate XAResource at %s") String subordinateXaResource(URI location); + // Warn + + @LogMessage(level = Logger.Level.WARN) + @Message(value = "Unknown I/O error when listing xa resource recovery files in %s (File.list() returned null)") + void listXAResourceRecoveryFilesNull(File dir); + + // Debug @LogMessage(level = Logger.Level.DEBUG) @@ -82,6 +92,26 @@ public interface Log extends BasicLogger { @Message(value = "Failure on running doRecover during initialization") void doRecoverFailureOnIntialization(@Cause Throwable e); + @LogMessage(level = Logger.Level.TRACE) + @Message (value = "Created xa resource recovery file: %s") + void xaResourceRecoveryFileCreated(Path path); + + @LogMessage(level = Logger.Level.TRACE) + @Message (value = "Deleted xa resource recovery file: %s") + void xaResourceRecoveryFileDeleted(Path path); + + @LogMessage(level = Logger.Level.TRACE) + @Message(value = "Reloaded xa resource recovery registry file: %s") + void xaResourceRecoveryRegistryReloaded(Path filePath); + + @LogMessage(level = Logger.Level.TRACE) + @Message(value = "Added resource (%s) to xa resource recovery registry %s") + void xaResourceAddedToRecoveryRegistry(URI uri, Path filePath); + + @LogMessage(level = Logger.Level.TRACE) + @Message(value = "Recovered in doubt xa resource (%s) from xa resource recovery registry %s") + void xaResourceRecoveredFromRecoveryRegistry(URI uri, Path filePath); + // Regular messages @Message(id = 0, value = "No transaction associated with the current thread") @@ -357,4 +387,22 @@ public interface Log extends BasicLogger { @Message(id = 90, value = "Cannot assign location \"%s\" to transaction because it is already located at \"%s\"") IllegalStateException locationAlreadyInitialized(URI newLocation, URI oldLocation); + + @Message(id = 91, value = "Failed to create xa resource recovery file: %s") + SystemException createXAResourceRecoveryFileFailed(Path filePath, @Cause IOException e); + + @Message(id = 92, value = "Failed to append xa resource (%s) to xa recovery file: %s") + SystemException appendXAResourceRecoveryFileFailed(URI uri, Path filePath, @Cause IOException e); + + @Message(id = 93, value = "Failed to delete xa recovery registry file %s on removal of %s") + XAException deleteXAResourceRecoveryFileFailed(@Field int errorCode, Path filePath, XAResource resource, @Cause IOException e); + + @Message(id = 94, value = "Failed to read xa resource recovery file %s") + IOException readXAResourceRecoveryFileFailed(Path filePath, @Cause IOException e); + + @Message(id = 95, value = "Failed to read URI '%s' from xa resource recovery file %s") + IOException readURIFromXAResourceRecoveryFileFailed(String uriString, Path filePath, @Cause URISyntaxException e); + + @Message(id = 96, value = "Unexpected exception on XA recovery") + IllegalStateException unexpectedExceptionOnXAResourceRecovery(@Cause IOException e); } diff --git a/src/main/java/org/wildfly/transaction/client/provider/jboss/FileSystemXAResourceRegistry.java b/src/main/java/org/wildfly/transaction/client/provider/jboss/FileSystemXAResourceRegistry.java new file mode 100644 index 0000000..8253aa2 --- /dev/null +++ b/src/main/java/org/wildfly/transaction/client/provider/jboss/FileSystemXAResourceRegistry.java @@ -0,0 +1,320 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2018 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wildfly.transaction.client.provider.jboss; + +import org.wildfly.common.annotation.NotNull; +import org.wildfly.transaction.client.LocalTransaction; +import org.wildfly.transaction.client.SimpleXid; +import org.wildfly.transaction.client.XAResourceRegistry; +import org.wildfly.transaction.client._private.Log; +import org.wildfly.transaction.client.spi.LocalTransactionProvider; + +import javax.transaction.SystemException; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * A registry persisted in a series of log files, containing all outflowed resources info for unfinished transactions. + * This registry is created whenever a subordinate resource is outflowed to a remote location, + * and deleted only when all outflowed resources participating in that transaction are successfully prepared, + * committed or rolled back (the two latter in the case of a one-phase commit). + * + * Used for {@link #getInDoubtXAResources()} recovery of in doubt resources}. + * + * @author Flavia Rainone + */ +final class FileSystemXAResourceRegistry { + + /** + * Name of recovery dir. Location of this dir can be defined at constructor. + */ + private static final String RECOVERY_DIR = "ejb-xa-recovery"; + + /** + * Empty utility array. + */ + private static final XAResource[] EMPTY_IN_DOUBT_RESOURCES = new XAResource[0]; + + /** + * Key for keeeping the xa resource registry associated with a local transaction + */ + private static final Object XA_RESOURCE_REGISTRY_KEY = new Object(); + + /** + * The local transaction provider associated with this file system XAResource registry + */ + private final LocalTransactionProvider provider; + + /** + * The xa recovery path, i.e., the path containing {@link #RECOVERY_DIR}. + */ + private final Path xaRecoveryPath; + + /** + * A set containing the list of all registry files that are currently open. Used as a support to + * identify in doubt registries. See {@link #recoverInDoubtRegistries}. + */ + private final Set openFilePaths = Collections.synchronizedSet(new HashSet<>()); + + /** + * A set of in doubt resources, i.e., outflowed resources whose prepare/rollback/commit operation was not + * completed normally, or resources that have been recovered from in doubt registries. See + * {@link XAResourceRegistryFile#resourceInDoubt} and {@link XAResourceRegistryFile#loadInDoubtResources}. + */ + private final Set inDoubtResources = Collections.synchronizedSet(new HashSet<>()); // it is a set because we could have an in doubt resource reincide in failure to complete + + /** + * Creates a FileSystemXAResourceRegistry. + * + * @param relativePath the path recovery dir is relative to + */ + FileSystemXAResourceRegistry (LocalTransactionProvider provider, Path relativePath) { + this.provider = provider; + if (relativePath == null) + this.xaRecoveryPath = FileSystems.getDefault().getPath(RECOVERY_DIR); + else + this.xaRecoveryPath = relativePath.resolve(RECOVERY_DIR); + } + + /** + * Returns the XAResourceRegistry file for {@code transaction}. + * + * @param transaction the transaction + * @return the XAResourceRegistry for {@code transaction}. If there is no such registry file, a new one is created. + * @throws SystemException if an unexpected failure occurs when creating the registry file + */ + XAResourceRegistry getXAResourceRegistryFile(LocalTransaction transaction) throws SystemException { + XAResourceRegistry registry = (XAResourceRegistry) provider.getResource(transaction, XA_RESOURCE_REGISTRY_KEY); + if (registry != null) + return registry; + registry = new XAResourceRegistryFile(transaction.getXid()); + provider.putResource(transaction, XA_RESOURCE_REGISTRY_KEY, registry); + return registry; + } + + /** + * Returns a list containing all in doubt xa resources. A XAResource is considered in doubt if: + * + * An in doubt resource is no longer considered in doubt if it succeeded to rollback without an exception. + * + * Notice that in doubt xa resources are kept after the server shuts down, guaranteeing that they can eventually be + * recovered, even if in a different server JVM instance than the one that outflowed the resource. This mechanism + * assures proper recovery and abortion of the original in-doubt outflowed resource, that belongs to an external + * remote server. + * + * @return a list of the in doubt xa resources + */ + XAResource[] getInDoubtXAResources() { + try { + recoverInDoubtRegistries(); + } catch (IOException e) { + throw Log.log.unexpectedExceptionOnXAResourceRecovery(e); + } + return inDoubtResources.isEmpty() ? EMPTY_IN_DOUBT_RESOURCES : inDoubtResources.toArray( + new XAResource[inDoubtResources.size()]); + } + + /** + * Recovers closed registries files from file system. All those registries are considered in doubt. + * + * @throws IOException if there is an I/O error when reading the recovered registry files + */ + private void recoverInDoubtRegistries() throws IOException { + final File recoveryDir = xaRecoveryPath.toFile(); + if (!recoveryDir.exists()) { + return; + } + final String[] xaRecoveryFileNames = recoveryDir.list(); + if (xaRecoveryFileNames == null) { + Log.log.listXAResourceRecoveryFilesNull(recoveryDir); + return; + } + for (String xaRecoveryFileName : xaRecoveryFileNames) { + // check if file is not open already + if (!openFilePaths.contains(xaRecoveryFileName)) + new XAResourceRegistryFile(xaRecoveryFileName, provider); + } + } + + + /** + * Represents a single file in the file system that records all outflowed resources per a specific local transaction. + */ + private final class XAResourceRegistryFile extends XAResourceRegistry { + + /** + * Path to the registry file. + */ + @NotNull + private final Path filePath; + + /** + * The file channel, if non-null, it indicates that this registry represents a current, on-going transaction, + * if null, then this registry represents an registry file recovered from file system. + */ + private final FileChannel fileChannel; + + /** + * Keeps track of the XA outflowed resources stored in this registry, see {@link #addResource} and + * {@link #removeResource}. + */ + private final Set resources = Collections.synchronizedSet(new HashSet<>()); + + + /** + * Creates a XA recovery registry for a transaction. This method assumes that there is no file already + * existing for this transaction, and, furthermore, it is not thread safe (the creation of this object is + * already thread protected at the caller). + * + * @param xid the transaction xid + * @throws SystemException if the there was a problem when creating the recovery file in file system + */ + XAResourceRegistryFile(Xid xid) throws SystemException { + xaRecoveryPath.toFile().mkdir(); // create dir if non existent + final String xidString = SimpleXid.of(xid).toHexString(); + this.filePath = xaRecoveryPath.resolve(xidString); + openFilePaths.add(xidString); + try { + fileChannel = FileChannel.open(filePath, StandardOpenOption.APPEND, StandardOpenOption.CREATE_NEW); + fileChannel.lock(); + Log.log.xaResourceRecoveryFileCreated(filePath); + } catch (IOException e) { + throw Log.log.createXAResourceRecoveryFileFailed(filePath, e); + } + } + + /** + * Reload a registry that is in doubt, i.e., the registry is not associated yet with a current + * transaction in this server, but with a transaction of a previous jvm instance that is now + * being recovered. + * This will happen only if the jvm crashes before a transaction with XA outflowed resources is + * fully prepared. In this case, any lines in the registry can correspond to in doubt outflowed + * resources. The goal is to reload those resources so they can be recovered. + * + * @param inDoubtFilePath the file path of the in doubt registry + * @throws IOException if there is an I/O error when realoding the registry file + */ + private XAResourceRegistryFile(String inDoubtFilePath, LocalTransactionProvider provider) throws IOException { + this.filePath = xaRecoveryPath.resolve(inDoubtFilePath); + this.fileChannel = null; // no need to open file channel here + openFilePaths.add(inDoubtFilePath); + loadInDoubtResources(provider.getNodeName()); + Log.log.xaResourceRecoveryRegistryReloaded(filePath); + } + + /** + * {@inheritDoc} + */ + @Override + protected void addResource(XAResource resource, URI uri) throws SystemException { + assert fileChannel != null; + try { + assert fileChannel.isOpen(); + fileChannel.write(ByteBuffer.wrap((uri.toString() + System.lineSeparator()).getBytes(StandardCharsets.UTF_8))); + fileChannel.force(true); + } catch (IOException e) { + throw Log.log.appendXAResourceRecoveryFileFailed(uri, filePath, e); + } + this.resources.add(resource); + Log.log.xaResourceAddedToRecoveryRegistry(uri, filePath); + } + + /** + * {@inheritDoc} + * The registry file is closed and deleted if there are no more resources left. + * + * @throws XAException if there is a problem deleting the registry file + */ + @Override + protected void removeResource(XAResource resource) throws XAException { + if (resources.remove(resource)) { + if (resources.isEmpty()) { + // delete file + try { + if (fileChannel != null) { + fileChannel.close(); + } + Files.delete(filePath); + openFilePaths.remove(filePath.toString()); + } catch (IOException e) { + throw Log.log.deleteXAResourceRecoveryFileFailed(XAException.XAER_RMERR, filePath, resource, e); + } + Log.log.xaResourceRecoveryFileDeleted(filePath); + } + // remove resource from in doubt list, in case the resource was in doubt + inDoubtResources.remove(resource); + } + } + + /** + * {@inheritDoc} + */ + @Override + protected void resourceInDoubt(XAResource resource) { + inDoubtResources.add(resource); + } + + /** + * Loads in doubt resources from recovered registry file. + * + * @throws IOException if an I/O error occurs when reloading the resources from the file + */ + private void loadInDoubtResources(String nodeName) throws IOException { + assert fileChannel == null; + final List uris; + try { + uris = Files.readAllLines(filePath); + } catch (IOException e) { + throw Log.log.readXAResourceRecoveryFileFailed(filePath, e); + } + for (String uriString : uris) { + // adding a line separator at the end of each uri entry results in an extra empty line + if (uriString.isEmpty()) + continue; + final URI uri; + try { + uri = new URI(uriString); + } catch (URISyntaxException e) { + throw Log.log.readURIFromXAResourceRecoveryFileFailed(uriString, filePath, e); + } + final XAResource xaresource = reloadInDoubtResource(uri, nodeName); + inDoubtResources.add(xaresource); + Log.log.xaResourceRecoveredFromRecoveryRegistry(uri, filePath); + } + } + } +} diff --git a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTALocalTransactionProvider.java b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTALocalTransactionProvider.java index f9cbb73..6b5a6db 100644 --- a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTALocalTransactionProvider.java +++ b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTALocalTransactionProvider.java @@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; +import java.nio.file.Path; import java.security.PrivilegedAction; import javax.transaction.HeuristicMixedException; @@ -40,6 +41,7 @@ import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; import org.jboss.tm.ExtendedJBossXATerminator; import org.jboss.tm.TransactionTimeoutConfiguration; +import org.jboss.tm.XAResourceRecoveryRegistry; import org.wildfly.common.annotation.NotNull; import org.wildfly.transaction.client.SimpleXid; import org.wildfly.transaction.client._private.Log; @@ -48,8 +50,9 @@ final class JBossJTALocalTransactionProvider extends JBossLocalTransactionProvid private final Object resourceLock = new Object(); - JBossJTALocalTransactionProvider(final int staleTransactionTime, final ExtendedJBossXATerminator ext, final TransactionManager tm) { - super(ext, staleTransactionTime, tm); + JBossJTALocalTransactionProvider(final int staleTransactionTime, final ExtendedJBossXATerminator ext, final TransactionManager tm, + final XAResourceRecoveryRegistry reg, final Path xaRecoveryPath) { + super(ext, staleTransactionTime, tm, reg, xaRecoveryPath); } int getTransactionManagerTimeout() throws SystemException { diff --git a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTSLocalTransactionProvider.java b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTSLocalTransactionProvider.java index cd610a6..f2db516 100644 --- a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTSLocalTransactionProvider.java +++ b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossJTSLocalTransactionProvider.java @@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; +import java.nio.file.Path; import java.security.PrivilegedAction; import javax.transaction.HeuristicMixedException; @@ -41,6 +42,7 @@ import com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple; import org.jboss.tm.ExtendedJBossXATerminator; import org.jboss.tm.TransactionTimeoutConfiguration; +import org.jboss.tm.XAResourceRecoveryRegistry; import org.wildfly.common.annotation.NotNull; import org.wildfly.transaction.client.SimpleXid; import org.wildfly.transaction.client._private.Log; @@ -49,8 +51,9 @@ final class JBossJTSLocalTransactionProvider extends JBossLocalTransactionProvid private final Object resourceLock = new Object(); - JBossJTSLocalTransactionProvider(final int staleTransactionTime, final ExtendedJBossXATerminator ext, final TransactionManager tm) { - super(ext, staleTransactionTime, tm); + JBossJTSLocalTransactionProvider(final int staleTransactionTime, final ExtendedJBossXATerminator ext, final TransactionManager tm, + final XAResourceRecoveryRegistry reg, final Path xaRecoveryPath) { + super(ext, staleTransactionTime, tm, reg, xaRecoveryPath); } int getTransactionManagerTimeout() throws SystemException { diff --git a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossLocalTransactionProvider.java b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossLocalTransactionProvider.java index 01b896a..2a5634c 100755 --- a/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossLocalTransactionProvider.java +++ b/src/main/java/org/wildfly/transaction/client/provider/jboss/JBossLocalTransactionProvider.java @@ -25,6 +25,7 @@ import static java.lang.Long.signum; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -51,11 +52,14 @@ import org.jboss.tm.ExtendedJBossXATerminator; import org.jboss.tm.ImportedTransaction; import org.jboss.tm.TransactionImportResult; +import org.jboss.tm.XAResourceRecoveryRegistry; import org.wildfly.common.Assert; import org.wildfly.common.annotation.NotNull; import org.wildfly.transaction.client.ImportResult; +import org.wildfly.transaction.client.LocalTransaction; import org.wildfly.transaction.client.SimpleXid; import org.wildfly.transaction.client.XAImporter; +import org.wildfly.transaction.client.XAResourceRegistry; import org.wildfly.transaction.client._private.Log; import org.wildfly.transaction.client.spi.LocalTransactionProvider; import org.wildfly.transaction.client.spi.SubordinateTransactionControl; @@ -74,8 +78,10 @@ public abstract class JBossLocalTransactionProvider implements LocalTransactionP private final XAImporterImpl xi = new XAImporterImpl(); private final ConcurrentSkipListSet timeoutSet = new ConcurrentSkipListSet<>(); private final ConcurrentMap known = new ConcurrentHashMap<>(); + private final FileSystemXAResourceRegistry fileSystemXAResourceRegistry; - JBossLocalTransactionProvider(final ExtendedJBossXATerminator ext, final int staleTransactionTime, final TransactionManager tm) { + JBossLocalTransactionProvider(final ExtendedJBossXATerminator ext, final int staleTransactionTime, final TransactionManager tm, + final XAResourceRecoveryRegistry registry, final Path xaRecoveryDirRelativeToPath) { Assert.checkMinimumParameter("setTransactionTimeout", 0, staleTransactionTime); this.staleTransactionTime = staleTransactionTime; this.ext = Assert.checkNotNullParam("ext", ext); @@ -88,6 +94,8 @@ public abstract class JBossLocalTransactionProvider implements LocalTransactionP // if it fails we ignore, troubles will be adjusted during runtime Log.log.doRecoverFailureOnIntialization(e); } + this.fileSystemXAResourceRegistry = new FileSystemXAResourceRegistry(this, xaRecoveryDirRelativeToPath); + registry.addXAResourceRecovery(fileSystemXAResourceRegistry::getInDoubtXAResources); } /** @@ -147,6 +155,11 @@ public Transaction createNewTransaction(final int timeout) throws SystemExceptio } } + @Override + public XAResourceRegistry getXAResourceRegistry(LocalTransaction transaction) throws SystemException { + return fileSystemXAResourceRegistry.getXAResourceRegistryFile(transaction); + } + abstract int getTransactionManagerTimeout() throws SystemException; public boolean isImported(@NotNull final Transaction transaction) throws IllegalArgumentException { @@ -620,6 +633,8 @@ public static final class Builder { private XATerminator xaTerminator; private TransactionManager transactionManager; private TransactionSynchronizationRegistry transactionSynchronizationRegistry; + private XAResourceRecoveryRegistry xaResourceRecoveryRegistry; + private Path xaRecoveryLogDirRelativeToPath; Builder() { } @@ -728,6 +743,28 @@ public Builder setTransactionSynchronizationRegistry(final TransactionSynchroniz return this; } + /** + * Set the xa resource recovery registry. + * + * @param reg xa resource recovery registry (must not be {@code null}) + */ + public Builder setXAResourceRecoveryRegistry(final XAResourceRecoveryRegistry reg) { + Assert.checkNotNullParam("reg", reg); + this.xaResourceRecoveryRegistry = reg; + return this; + } + + /** + * Set the xa recovery log dir relative to path + * + * @param path the xa recovery log file relative to path (must not be {@code null} + */ + public Builder setXARecoveryLogDirRelativeToPath(Path path) { + Assert.checkNotNullParam("path", path); + this.xaRecoveryLogDirRelativeToPath = path; + return this; + } + /** * Build this provider. If any required properties are {@code null}, an exception is thrown. * @@ -743,10 +780,12 @@ public JBossLocalTransactionProvider build() { Assert.checkMinimumParameter("staleTransactionTime", 0, staleTransactionTime); if (transactionManager instanceof com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple || transactionManager instanceof com.arjuna.ats.jbossatx.jta.TransactionManagerDelegate) { - return new JBossJTALocalTransactionProvider(staleTransactionTime, extendedJBossXATerminator, transactionManager); + return new JBossJTALocalTransactionProvider(staleTransactionTime, extendedJBossXATerminator, + transactionManager, xaResourceRecoveryRegistry, xaRecoveryLogDirRelativeToPath); } else if (transactionManager instanceof com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple || transactionManager instanceof com.arjuna.ats.jbossatx.jts.TransactionManagerDelegate) { - return new JBossJTSLocalTransactionProvider(staleTransactionTime, extendedJBossXATerminator, transactionManager); + return new JBossJTSLocalTransactionProvider(staleTransactionTime, extendedJBossXATerminator, + transactionManager, xaResourceRecoveryRegistry, xaRecoveryLogDirRelativeToPath); } else { throw Log.log.unknownTransactionManagerType(transactionManager.getClass()); } diff --git a/src/main/java/org/wildfly/transaction/client/spi/LocalTransactionProvider.java b/src/main/java/org/wildfly/transaction/client/spi/LocalTransactionProvider.java index 08ba118..59a1742 100644 --- a/src/main/java/org/wildfly/transaction/client/spi/LocalTransactionProvider.java +++ b/src/main/java/org/wildfly/transaction/client/spi/LocalTransactionProvider.java @@ -34,8 +34,10 @@ import org.wildfly.common.Assert; import org.wildfly.common.annotation.NotNull; import org.wildfly.transaction.client.ImportResult; +import org.wildfly.transaction.client.LocalTransaction; import org.wildfly.transaction.client.SimpleXid; import org.wildfly.transaction.client.XAImporter; +import org.wildfly.transaction.client.XAResourceRegistry; import org.wildfly.transaction.client._private.Log; /** @@ -218,6 +220,18 @@ default T getProviderInterface(Transaction transaction, Class providerInt return null; } + /** + * Return the XAResource registry associated with {@code transaction}. If there is no such + * registry yet, one is created. + * + * @param transaction the transaction + * @return the registry associated with {@code transaction}. Can return {@code null} if this + * provider requires no XAResource registry + */ + default XAResourceRegistry getXAResourceRegistry(LocalTransaction transaction) throws SystemException { + return null; + } + /** * An empty provider which does not support new transactions. */