@@ -143,6 +143,18 @@ const onEndpointErrorChannel = dc.channel('quic.endpoint.error');
143
143
const onEndpointBusyChangeChannel = dc . channel ( 'quic.endpoint.busy.change' ) ;
144
144
const onEndpointClientSessionChannel = dc . channel ( 'quic.session.created.client' ) ;
145
145
const onEndpointServerSessionChannel = dc . channel ( 'quic.session.created.server' ) ;
146
+ const onSessionOpenStreamChannel = dc . channel ( 'quic.session.open.stream' ) ;
147
+ const onSessionReceivedStreamChannel = dc . channel ( 'quic.session.received.stream' ) ;
148
+ const onSessionSendDatagramChannel = dc . channel ( 'quic.session.send.datagram' ) ;
149
+ const onSessionUpdateKeyChannel = dc . channel ( 'quic.session.update.key' ) ;
150
+ const onSessionClosingChannel = dc . channel ( 'quic.session.closing' ) ;
151
+ const onSessionClosedChannel = dc . channel ( 'quic.session.closed' ) ;
152
+ const onSessionReceiveDatagramChannel = dc . channel ( 'quic.session.receive.datagram' ) ;
153
+ const onSessionReceiveDatagramStatusChannel = dc . channel ( 'quic.session.receive.datagram.status' ) ;
154
+ const onSessionPathValidationChannel = dc . channel ( 'quic.session.path.validation' ) ;
155
+ const onSessionTicketChannel = dc . channel ( 'quic.session.ticket' ) ;
156
+ const onSessionVersionNegotiationChannel = dc . channel ( 'quic.session.version.negotiation' ) ;
157
+ const onSessionHandshakeChannel = dc . channel ( 'quic.session.handshake' ) ;
146
158
147
159
/**
148
160
* @typedef {import('../socketaddress.js').SocketAddress } SocketAddress
@@ -873,6 +885,13 @@ class QuicSession {
873
885
const stream = new QuicStream ( kPrivateConstructor , this . #streamConfig, handle ,
874
886
this , 0 /* Bidirectional */ ) ;
875
887
this . #streams. add ( stream ) ;
888
+
889
+ if ( onSessionOpenStreamChannel . hasSubscribers ) {
890
+ onSessionOpenStreamChannel . publish ( {
891
+ stream,
892
+ session : this ,
893
+ } ) ;
894
+ }
876
895
return stream ;
877
896
}
878
897
@@ -893,6 +912,14 @@ class QuicSession {
893
912
const stream = new QuicStream ( kPrivateConstructor , this . #streamConfig, handle ,
894
913
this , 1 /* Unidirectional */ ) ;
895
914
this . #streams. add ( stream ) ;
915
+
916
+ if ( onSessionOpenStreamChannel . hasSubscribers ) {
917
+ onSessionOpenStreamChannel . publish ( {
918
+ stream,
919
+ session : this ,
920
+ } ) ;
921
+ }
922
+
896
923
return stream ;
897
924
}
898
925
@@ -923,7 +950,17 @@ class QuicSession {
923
950
datagram . byteOffset ,
924
951
datagram . byteLength ) ;
925
952
}
926
- return this . #handle. sendDatagram ( datagram ) ;
953
+ const id = this . #handle. sendDatagram ( datagram ) ;
954
+
955
+ if ( onSessionSendDatagramChannel . hasSubscribers ) {
956
+ onSessionSendDatagramChannel . publish ( {
957
+ id,
958
+ length : datagram . byteLength ,
959
+ session : this ,
960
+ } ) ;
961
+ }
962
+
963
+ return id ;
927
964
}
928
965
929
966
/**
@@ -934,6 +971,11 @@ class QuicSession {
934
971
throw new ERR_INVALID_STATE ( 'Session is closed' ) ;
935
972
}
936
973
this . #handle. updateKey ( ) ;
974
+ if ( onSessionUpdateKeyChannel . hasSubscribers ) {
975
+ onSessionUpdateKeyChannel . publish ( {
976
+ session : this ,
977
+ } ) ;
978
+ }
937
979
}
938
980
939
981
/**
@@ -950,6 +992,11 @@ class QuicSession {
950
992
if ( ! this . #isClosedOrClosing) {
951
993
this . #isPendingClose = true ;
952
994
this . #handle?. gracefulClose ( ) ;
995
+ if ( onSessionClosingChannel . hasSubscribers ) {
996
+ onSessionClosingChannel . publish ( {
997
+ session : this ,
998
+ } ) ;
999
+ }
953
1000
}
954
1001
return this . closed ;
955
1002
}
@@ -1023,6 +1070,12 @@ class QuicSession {
1023
1070
this . #handle. destroy ( ) ;
1024
1071
this . #handle = undefined ;
1025
1072
1073
+ if ( onSessionClosedChannel . hasSubscribers ) {
1074
+ onSessionClosedChannel . publish ( {
1075
+ session : this ,
1076
+ } ) ;
1077
+ }
1078
+
1026
1079
return this . closed ;
1027
1080
}
1028
1081
@@ -1069,6 +1122,14 @@ class QuicSession {
1069
1122
assert ( this . #ondatagram, 'Unexpected datagram event' ) ;
1070
1123
if ( this . destroyed ) return ;
1071
1124
this . #ondatagram( u8 , early ) ;
1125
+
1126
+ if ( onSessionReceiveDatagramChannel . hasSubscribers ) {
1127
+ onSessionReceiveDatagramChannel . publish ( {
1128
+ length : u8 . byteLength ,
1129
+ early,
1130
+ session : this ,
1131
+ } ) ;
1132
+ }
1072
1133
}
1073
1134
1074
1135
/**
@@ -1080,6 +1141,14 @@ class QuicSession {
1080
1141
// The ondatagramstatus callback may not have been specified. That's ok.
1081
1142
// We'll just ignore the event in that case.
1082
1143
this . #ondatagramstatus?. ( id , status ) ;
1144
+
1145
+ if ( onSessionReceiveDatagramStatusChannel . hasSubscribers ) {
1146
+ onSessionReceiveDatagramStatusChannel . publish ( {
1147
+ id,
1148
+ status,
1149
+ session : this ,
1150
+ } ) ;
1151
+ }
1083
1152
}
1084
1153
1085
1154
/**
@@ -1098,6 +1167,18 @@ class QuicSession {
1098
1167
if ( this . destroyed ) return ;
1099
1168
this . #onpathvalidation( result , newLocalAddress , newRemoteAddress ,
1100
1169
oldLocalAddress , oldRemoteAddress , preferredAddress ) ;
1170
+
1171
+ if ( onSessionPathValidationChannel . hasSubscribers ) {
1172
+ onSessionPathValidationChannel . publish ( {
1173
+ result,
1174
+ newLocalAddress,
1175
+ newRemoteAddress,
1176
+ oldLocalAddress,
1177
+ oldRemoteAddress,
1178
+ preferredAddress,
1179
+ session : this ,
1180
+ } ) ;
1181
+ }
1101
1182
}
1102
1183
1103
1184
/**
@@ -1109,6 +1190,12 @@ class QuicSession {
1109
1190
assert ( this . #onsessionticket, 'Unexpected session ticket event' ) ;
1110
1191
if ( this . destroyed ) return ;
1111
1192
this . #onsessionticket( ticket ) ;
1193
+ if ( onSessionTicketChannel . hasSubscribers ) {
1194
+ onSessionTicketChannel . publish ( {
1195
+ ticket,
1196
+ session : this ,
1197
+ } ) ;
1198
+ }
1112
1199
}
1113
1200
1114
1201
/**
@@ -1123,6 +1210,15 @@ class QuicSession {
1123
1210
if ( this . destroyed ) return ;
1124
1211
this . #onversionnegotiation( version , requestedVersions , supportedVersions ) ;
1125
1212
this . destroy ( new ERR_QUIC_VERSION_NEGOTIATION_ERROR ( ) ) ;
1213
+
1214
+ if ( onSessionVersionNegotiationChannel . hasSubscribers ) {
1215
+ onSessionVersionNegotiationChannel . publish ( {
1216
+ version,
1217
+ requestedVersions,
1218
+ supportedVersions,
1219
+ session : this ,
1220
+ } ) ;
1221
+ }
1126
1222
}
1127
1223
1128
1224
/**
@@ -1141,6 +1237,19 @@ class QuicSession {
1141
1237
// We'll just ignore the event in that case.
1142
1238
this . #onhandshake?. ( sni , alpn , cipher , cipherVersion , validationErrorReason ,
1143
1239
validationErrorCode , earlyDataAccepted ) ;
1240
+
1241
+ if ( onSessionHandshakeChannel . hasSubscribers ) {
1242
+ onSessionHandshakeChannel . publish ( {
1243
+ sni,
1244
+ alpn,
1245
+ cipher,
1246
+ cipherVersion,
1247
+ validationErrorReason,
1248
+ validationErrorCode,
1249
+ earlyDataAccepted,
1250
+ session : this ,
1251
+ } ) ;
1252
+ }
1144
1253
}
1145
1254
1146
1255
/**
@@ -1161,6 +1270,13 @@ class QuicSession {
1161
1270
this . #streams. add ( stream ) ;
1162
1271
1163
1272
this . #onstream( stream ) ;
1273
+
1274
+ if ( onSessionReceivedStreamChannel . hasSubscribers ) {
1275
+ onSessionReceivedStreamChannel . publish ( {
1276
+ stream,
1277
+ session : this ,
1278
+ } ) ;
1279
+ }
1164
1280
}
1165
1281
1166
1282
[ kRemoveStream ] ( stream ) {
@@ -1442,6 +1558,7 @@ class QuicEndpoint {
1442
1558
#newSession( handle ) {
1443
1559
const session = new QuicSession ( kPrivateConstructor , this . #sessionConfig, handle , this ) ;
1444
1560
this . #sessions. add ( session ) ;
1561
+ return session ;
1445
1562
}
1446
1563
1447
1564
/**
0 commit comments