diff --git a/x/wasm/keeper/handler_plugin.go b/x/wasm/keeper/handler_plugin.go
index e812411c51..f8c35b4e5b 100644
--- a/x/wasm/keeper/handler_plugin.go
+++ b/x/wasm/keeper/handler_plugin.go
@@ -111,6 +111,21 @@ func (h SDKMessageHandler) handleSdkMessage(ctx sdk.Context, contractAddr sdk.Ad
 	return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "can't route message %+v", msg)
 }
 
+// callDepthMessageHandler is a wrapper around a Messenger that checks the call depth before dispatching a message.
+type callDepthMessageHandler struct {
+	Messenger
+	MaxCallDepth uint32
+}
+
+func (h callDepthMessageHandler) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
+	ctx, err = checkAndIncreaseCallDepth(ctx, h.MaxCallDepth)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	return h.Messenger.DispatchMsg(ctx, contractAddr, contractIBCPortID, msg)
+}
+
 // MessageHandlerChain defines a chain of handlers that are called one by one until it can be handled.
 type MessageHandlerChain struct {
 	handlers []Messenger
diff --git a/x/wasm/keeper/keeper.go b/x/wasm/keeper/keeper.go
index 11474a14ba..d579c917aa 100644
--- a/x/wasm/keeper/keeper.go
+++ b/x/wasm/keeper/keeper.go
@@ -93,6 +93,7 @@ type Keeper struct {
 	queryGasLimit        uint64
 	gasRegister          types.GasRegister
 	maxQueryStackSize    uint32
+	maxCallDepth         uint32
 	acceptedAccountTypes map[reflect.Type]struct{}
 	accountPruner        AccountPruner
 	// propagate gov authZ to sub-messages
@@ -754,6 +755,24 @@ func checkAndIncreaseQueryStackSize(ctx sdk.Context, maxQueryStackSize uint32) (
 	return types.WithQueryStackSize(ctx, queryStackSize), nil
 }
 
+func checkAndIncreaseCallDepth(ctx sdk.Context, maxCallDepth uint32) (sdk.Context, error) {
+	var callDepth uint32 = 0
+	if size, ok := types.CallDepth(ctx); ok {
+		callDepth = size
+	}
+
+	// increase
+	callDepth++
+
+	// did we go too far?
+	if callDepth > maxCallDepth {
+		return sdk.Context{}, types.ErrExceedMaxCallDepth
+	}
+
+	// set updated stack size
+	return types.WithCallDepth(sdk.UnwrapSDKContext(ctx), callDepth), nil
+}
+
 // QueryRaw returns the contract's state for give key. Returns `nil` when key is `nil`.
 func (k Keeper) QueryRaw(ctx sdk.Context, contractAddress sdk.AccAddress, key []byte) []byte {
 	defer telemetry.MeasureSince(time.Now(), "wasm", "contract", "query-raw")
diff --git a/x/wasm/keeper/keeper_cgo.go b/x/wasm/keeper/keeper_cgo.go
index 743ac64de4..ee950133ea 100644
--- a/x/wasm/keeper/keeper_cgo.go
+++ b/x/wasm/keeper/keeper_cgo.go
@@ -48,6 +48,7 @@ func NewKeeper(
 		queryGasLimit:        wasmConfig.SmartQueryGasLimit,
 		gasRegister:          types.NewDefaultWasmGasRegister(),
 		maxQueryStackSize:    types.DefaultMaxQueryStackSize,
+		maxCallDepth:         types.DefaultMaxCallDepth,
 		acceptedAccountTypes: defaultAcceptedAccountTypes,
 		propagateGovAuthorization: map[types.AuthorizationPolicyAction]struct{}{
 			types.AuthZActionInstantiate: {},
@@ -59,6 +60,8 @@ func NewKeeper(
 	for _, o := range preOpts {
 		o.apply(keeper)
 	}
+	// always wrap the messenger, even if it was replaced by an option
+	keeper.messenger = callDepthMessageHandler{keeper.messenger, keeper.maxCallDepth}
 	// only set the wasmvm if no one set this in the options
 	// NewVM does a lot, so better not to create it and silently drop it.
 	if keeper.wasmVM == nil {
diff --git a/x/wasm/keeper/options.go b/x/wasm/keeper/options.go
index 7ccf7d9e52..8234c7d0cb 100644
--- a/x/wasm/keeper/options.go
+++ b/x/wasm/keeper/options.go
@@ -158,6 +158,12 @@ func WithMaxQueryStackSize(m uint32) Option {
 	})
 }
 
+func WithMaxCallDepth(m uint32) Option {
+	return optsFn(func(k *Keeper) {
+		k.maxCallDepth = m
+	})
+}
+
 // WithAcceptedAccountTypesOnContractInstantiation sets the accepted account types. Account types of this list won't be overwritten or cause a failure
 // when they exist for an address on contract instantiation.
 //
diff --git a/x/wasm/keeper/options_test.go b/x/wasm/keeper/options_test.go
index b26e1af7c2..9dd65af7d8 100644
--- a/x/wasm/keeper/options_test.go
+++ b/x/wasm/keeper/options_test.go
@@ -55,8 +55,9 @@ func TestConstructorOptions(t *testing.T) {
 		"message handler": {
 			srcOpt: WithMessageHandler(&wasmtesting.MockMessageHandler{}),
 			verify: func(t *testing.T, k Keeper) {
-				t.Helper()
-				assert.IsType(t, &wasmtesting.MockMessageHandler{}, k.messenger)
+				require.IsType(t, callDepthMessageHandler{}, k.messenger)
+				messenger, _ := k.messenger.(callDepthMessageHandler)
+				assert.IsType(t, &wasmtesting.MockMessageHandler{}, messenger.Messenger)
 			},
 		},
 		"query plugins": {
@@ -68,7 +69,7 @@ func TestConstructorOptions(t *testing.T) {
 		},
 		"message handler decorator": {
 			srcOpt: WithMessageHandlerDecorator(func(old Messenger) Messenger {
-				require.IsType(t, &MessageHandlerChain{}, old)
+				require.IsType(t, callDepthMessageHandler{}, old)
 				return &wasmtesting.MockMessageHandler{}
 			}),
 			verify: func(t *testing.T, k Keeper) {
@@ -111,13 +112,19 @@ func TestConstructorOptions(t *testing.T) {
 				assert.Equal(t, uint64(2), costCanonical)
 			},
 		},
-		"max recursion query limit": {
+		"max query recursion limit": {
 			srcOpt: WithMaxQueryStackSize(1),
 			verify: func(t *testing.T, k Keeper) {
 				t.Helper()
 				assert.IsType(t, uint32(1), k.maxQueryStackSize)
 			},
 		},
+		"max message recursion limit": {
+			srcOpt: WithMaxCallDepth(1),
+			verify: func(t *testing.T, k Keeper) {
+				assert.IsType(t, uint32(1), k.maxCallDepth)
+			},
+		},
 		"accepted account types": {
 			srcOpt: WithAcceptedAccountTypesOnContractInstantiation(&authtypes.BaseAccount{}, &vestingtypes.ContinuousVestingAccount{}),
 			verify: func(t *testing.T, k Keeper) {
diff --git a/x/wasm/types/context.go b/x/wasm/types/context.go
index 006e397624..7a251853a7 100644
--- a/x/wasm/types/context.go
+++ b/x/wasm/types/context.go
@@ -16,6 +16,8 @@ const (
 	contextKeySubMsgAuthzPolicy = iota
 	// gas register
 	contextKeyGasRegister = iota
+
+	contextKeyCallDepth contextKey = iota
 )
 
 // WithTXCounter stores a transaction counter value in the context
@@ -41,6 +43,15 @@ func QueryStackSize(ctx sdk.Context) (uint32, bool) {
 	return val, ok
 }
 
+func WithCallDepth(ctx sdk.Context, counter uint32) sdk.Context {
+	return ctx.WithValue(contextKeyCallDepth, counter)
+}
+
+func CallDepth(ctx sdk.Context) (uint32, bool) {
+	val, ok := ctx.Value(contextKeyCallDepth).(uint32)
+	return val, ok
+}
+
 // WithSubMsgAuthzPolicy stores the authorization policy for submessages into the context returned
 func WithSubMsgAuthzPolicy(ctx sdk.Context, policy AuthorizationPolicy) sdk.Context {
 	if policy == nil {
diff --git a/x/wasm/types/errors.go b/x/wasm/types/errors.go
index 58aa5caf1a..704067f026 100644
--- a/x/wasm/types/errors.go
+++ b/x/wasm/types/errors.go
@@ -86,6 +86,11 @@ var (
 	ErrNoSuchCodeFn = WasmVMFlavouredErrorFactory(errorsmod.Register(DefaultCodespace, 28, "no such code"),
 		func(id uint64) error { return wasmvmtypes.NoSuchCode{CodeID: id} },
 	)
+
+	// code 29 reserved for wasmd 0.50+
+
+	// ErrExceedMaxCallDepth error if max message stack size is exceeded
+	ErrExceedMaxCallDepth = errorsmod.Register(DefaultCodespace, 30, "max call depth exceeded")
 )
 
 // WasmVMErrorable mapped error type in wasmvm and are not redacted
diff --git a/x/wasm/types/wasmer_engine.go b/x/wasm/types/wasmer_engine.go
index d3b465de77..ed3f93a4d6 100644
--- a/x/wasm/types/wasmer_engine.go
+++ b/x/wasm/types/wasmer_engine.go
@@ -7,9 +7,11 @@ import (
 	sdk "github.com/cosmos/cosmos-sdk/types"
 )
 
-// DefaultMaxQueryStackSize maximum size of the stack of contract instances doing queries
+// DefaultMaxQueryStackSize maximum size of the stack of recursive queries a contract can make
 const DefaultMaxQueryStackSize uint32 = 10
 
+const DefaultMaxCallDepth uint32 = 500
+
 // WasmEngine defines the WASM contract runtime engine.
 type WasmEngine interface {
 	// Create will compile the wasm code, and store the resulting pre-compile