Skip to content

Commit

Permalink
Merge pull request #40 from fl4via/WFTC-38
Browse files Browse the repository at this point in the history
[WFTC-38] Add a XAResourceRegistry to record in a file all in doubt r…
  • Loading branch information
dmlloyd authored Mar 9, 2018
2 parents 2c511e5 + 149d6b1 commit 476840c
Show file tree
Hide file tree
Showing 11 changed files with 584 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.wildfly.common.Assert;
import org.wildfly.transaction.client._private.Log;
import org.wildfly.transaction.client.spi.LocalTransactionProvider;

/**
* A transaction from a local transaction provider.
Expand Down Expand Up @@ -134,6 +135,10 @@ void unimportBacking() {
}
}

LocalTransactionProvider getProvider() {
return owner.getProvider();
}

public void setRollbackOnly() throws IllegalStateException, SystemException {
transaction.setRollbackOnly();
}
Expand Down
43 changes: 27 additions & 16 deletions src/main/java/org/wildfly/transaction/client/SimpleXid.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,31 @@ public int compareTo(final SimpleXid o) {
return res;
}

public String toHexString() {
StringBuilder b = new StringBuilder();
toHexString(b);
return b.toString();
}

private void toHexString(StringBuilder builder) {
builder.append(Integer.toHexString(formatId)).append(':');
for (final byte x : globalId) {
final int v = x & 0xff;
if (v < 16) {
builder.append('0');
}
builder.append(Integer.toHexString(v));
}
builder.append(':');
for (final byte x : branchId) {
final int v = x & 0xff;
if (v < 16) {
builder.append('0');
}
builder.append(Integer.toHexString(v));
}
}

private static int compareByteArrays(byte[] a1, byte[] a2) {
final int l1 = a1.length;
final int l2 = a2.length;
Expand All @@ -130,22 +155,8 @@ private static int compareByteArrays(byte[] a1, byte[] a2) {

public String toString() {
StringBuilder b = new StringBuilder();
b.append("XID [").append(Integer.toHexString(formatId)).append(':');
for (final byte x : globalId) {
final int v = x & 0xff;
if (v < 16) {
b.append('0');
}
b.append(Integer.toHexString(v));
}
b.append(':');
for (final byte x : branchId) {
final int v = x & 0xff;
if (v < 16) {
b.append('0');
}
b.append(Integer.toHexString(v));
}
b.append("XID [");
toHexString(b);
b.append(']');
return b.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,30 @@ final class SubordinateXAResource implements XAResource, XARecoverable, Serializ

private final URI location;
private final String parentName;
private final XAResourceRegistry resourceRegistry;
private volatile int timeout = LocalTransactionContext.DEFAULT_TXN_TIMEOUT;
private long startTime = 0L;
private volatile Xid xid;
private int capturedTimeout;

private final AtomicInteger stateRef = new AtomicInteger(0);

SubordinateXAResource(final URI location, final String parentName) {
SubordinateXAResource(final URI location, final String parentName, XAResourceRegistry recoveryRegistry) {
this.location = location;
this.parentName = parentName;
this.resourceRegistry = recoveryRegistry;
}

SubordinateXAResource(final URI location, final String parentName, final int flags, XAResourceRegistry recoveryRegistry) {
this(location, parentName, recoveryRegistry);
stateRef.set(flags);
}

SubordinateXAResource(final URI location, final int flags, final String parentName) {
this.location = location;
this.parentName = parentName;
stateRef.set(flags);
this.resourceRegistry = null;
}

Xid getXid() {
Expand Down Expand Up @@ -136,15 +144,41 @@ public void beforeCompletion(final Xid xid) throws XAException {
}

public int prepare(final Xid xid) throws XAException {
return commitToEnlistment() ? lookup(xid).prepare() : XA_RDONLY;
final int result;
try {
result = commitToEnlistment() ? lookup(xid).prepare() : XA_RDONLY;
} catch (XAException | RuntimeException exception) {
if (resourceRegistry != null)
resourceRegistry.resourceInDoubt(this);
throw exception;
}
if (resourceRegistry != null)
resourceRegistry.removeResource(this);
return result;
}

public void commit(final Xid xid, final boolean onePhase) throws XAException {
if (commitToEnlistment()) lookup(xid).commit(onePhase);
try {
if (commitToEnlistment()) lookup(xid).commit(onePhase);
} catch (XAException | RuntimeException exception) {
if (onePhase && resourceRegistry != null)
resourceRegistry.resourceInDoubt(this);
throw exception;
}
if (onePhase && resourceRegistry != null)
resourceRegistry.removeResource(this);
}

public void rollback(final Xid xid) throws XAException {
if (commitToEnlistment()) lookup(xid).rollback();
try {
if (commitToEnlistment()) lookup(xid).rollback();
} catch (XAException | RuntimeException e) {
if (resourceRegistry != null)
resourceRegistry.resourceInDoubt(this);
throw e;
}
if (resourceRegistry != null)
resourceRegistry.removeResource(this);
}

public void forget(final Xid xid) throws XAException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ SubordinateXAResource getOrEnlist(final URI location, final String parentName) t
if (xaResource != null) {
return xaResource;
}
xaResource = new SubordinateXAResource(location, parentName);
final XAResourceRegistry resourceRegistry = transaction.getProvider().getXAResourceRegistry(transaction);
xaResource = new SubordinateXAResource(location, parentName, resourceRegistry);
if (resourceRegistry != null) {
resourceRegistry.addResource(xaResource, location);
}
if (! transaction.enlistResource(xaResource)) {
throw Log.log.couldNotEnlist();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2018 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wildfly.transaction.client;


import javax.transaction.SystemException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import java.net.URI;

import static org.wildfly.transaction.client.OutflowHandleManager.FL_COMMITTED;
import static org.wildfly.transaction.client.OutflowHandleManager.FL_CONFIRMED;

/**
* Registry that keeps track of outflowed resources info for a specific transaction.
*
* Used for recovery of those resources when they enter in doubt state.
*
* @author Flavia Rainone
*/
public abstract class XAResourceRegistry {

/**
* Adds a XA resource to this registry.
*
* @param resource the resource
* @param uri the resource URI location
* @throws SystemException if there is a problem recording this resource
*/
protected abstract void addResource(XAResource resource, URI uri) throws SystemException;

/**
* Removes a XA resource from this registry.
*
* @param resource the resource
* @throws XAException if there is a problem deleting this resource
*/
protected abstract void removeResource(XAResource resource) throws XAException;

/**
* Flags the previously added resource as in doubt, meaning that it failed to complete prepare, rollback or commit.
* It can be invoked more than once for the same resource if that resource repeatedly fails to perform those
* operations, such as a resource that first fails to prepare, and then fails to rollback.
*
* @param resource the resource
*/
protected abstract void resourceInDoubt(XAResource resource);

/**
* Reloads an in doubt resource, recreating a previously lost remote XA resource object. This method
* must be invoked to recreate in doubt resources after a server shutdown or crash.
*
* @param uri the URI where the outflowed resource is located
* @param nodeName the node name of the resource
* @return a newly-created resource representing a previously lost XA resource that is in doubt
*/
protected XAResource reloadInDoubtResource(URI uri, String nodeName) {
return new SubordinateXAResource(uri, nodeName, FL_COMMITTED | FL_CONFIRMED, this);
}
}
48 changes: 48 additions & 0 deletions src/main/java/org/wildfly/transaction/client/_private/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.wildfly.transaction.client._private;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.security.Permission;
import java.util.ServiceConfigurationError;

Expand Down Expand Up @@ -58,6 +61,13 @@ public interface Log extends BasicLogger {
@Message(value = "Subordinate XAResource at %s")
String subordinateXaResource(URI location);

// Warn

@LogMessage(level = Logger.Level.WARN)
@Message(value = "Unknown I/O error when listing xa resource recovery files in %s (File.list() returned null)")
void listXAResourceRecoveryFilesNull(File dir);


// Debug

@LogMessage(level = Logger.Level.DEBUG)
Expand All @@ -82,6 +92,26 @@ public interface Log extends BasicLogger {
@Message(value = "Failure on running doRecover during initialization")
void doRecoverFailureOnIntialization(@Cause Throwable e);

@LogMessage(level = Logger.Level.TRACE)
@Message (value = "Created xa resource recovery file: %s")
void xaResourceRecoveryFileCreated(Path path);

@LogMessage(level = Logger.Level.TRACE)
@Message (value = "Deleted xa resource recovery file: %s")
void xaResourceRecoveryFileDeleted(Path path);

@LogMessage(level = Logger.Level.TRACE)
@Message(value = "Reloaded xa resource recovery registry file: %s")
void xaResourceRecoveryRegistryReloaded(Path filePath);

@LogMessage(level = Logger.Level.TRACE)
@Message(value = "Added resource (%s) to xa resource recovery registry %s")
void xaResourceAddedToRecoveryRegistry(URI uri, Path filePath);

@LogMessage(level = Logger.Level.TRACE)
@Message(value = "Recovered in doubt xa resource (%s) from xa resource recovery registry %s")
void xaResourceRecoveredFromRecoveryRegistry(URI uri, Path filePath);

// Regular messages

@Message(id = 0, value = "No transaction associated with the current thread")
Expand Down Expand Up @@ -357,4 +387,22 @@ public interface Log extends BasicLogger {

@Message(id = 90, value = "Cannot assign location \"%s\" to transaction because it is already located at \"%s\"")
IllegalStateException locationAlreadyInitialized(URI newLocation, URI oldLocation);

@Message(id = 91, value = "Failed to create xa resource recovery file: %s")
SystemException createXAResourceRecoveryFileFailed(Path filePath, @Cause IOException e);

@Message(id = 92, value = "Failed to append xa resource (%s) to xa recovery file: %s")
SystemException appendXAResourceRecoveryFileFailed(URI uri, Path filePath, @Cause IOException e);

@Message(id = 93, value = "Failed to delete xa recovery registry file %s on removal of %s")
XAException deleteXAResourceRecoveryFileFailed(@Field int errorCode, Path filePath, XAResource resource, @Cause IOException e);

@Message(id = 94, value = "Failed to read xa resource recovery file %s")
IOException readXAResourceRecoveryFileFailed(Path filePath, @Cause IOException e);

@Message(id = 95, value = "Failed to read URI '%s' from xa resource recovery file %s")
IOException readURIFromXAResourceRecoveryFileFailed(String uriString, Path filePath, @Cause URISyntaxException e);

@Message(id = 96, value = "Unexpected exception on XA recovery")
IllegalStateException unexpectedExceptionOnXAResourceRecovery(@Cause IOException e);
}
Loading

0 comments on commit 476840c

Please # to comment.