2828import inner from 'cloudflare-internal:sockets' ;
2929
3030import {
31+ AbortError ,
3132 ERR_INVALID_ARG_VALUE ,
3233 ERR_INVALID_ARG_TYPE ,
3334 ERR_MISSING_ARGS ,
@@ -45,11 +46,20 @@ import {
4546 validateInt32 ,
4647 validateNumber ,
4748 validatePort ,
49+ validateObject ,
4850} from 'node-internal:validators' ;
4951
5052import { isUint8Array , isArrayBufferView } from 'node-internal:internal_types' ;
5153import { Duplex } from 'node-internal:streams_duplex' ;
5254import { Buffer } from 'node-internal:internal_buffer' ;
55+ import type {
56+ IpcSocketConnectOpts ,
57+ SocketConnectOpts ,
58+ TcpSocketConnectOpts ,
59+ AddressInfo ,
60+ Socket as _Socket ,
61+ OnReadOpts ,
62+ } from 'node:net' ;
5363
5464const kLastWriteQueueSize = Symbol ( 'kLastWriteQueueSize' ) ;
5565const kTimeout = Symbol ( 'kTimeout' ) ;
@@ -100,23 +110,18 @@ export type SocketOptions = {
100110 writableObjectMode ?: boolean ;
101111 keepAliveInitialDelay ?: number ;
102112 fd ?: number ;
103- handle ?: VoidFunction ;
113+ handle ?: Socket [ '_handle' ] ;
104114 noDelay ?: boolean ;
105115 keepAlive ?: boolean ;
106- allowHalfOpen ?: boolean ;
116+ allowHalfOpen ?: boolean | undefined ;
107117 emitClose ?: boolean ;
108- signal ?: AbortSignal ;
109- onread ?: { callback ?: ( ) => Uint8Array ; buffer ?: Uint8Array } ;
118+ signal ?: AbortSignal | undefined ;
119+ onread ?:
120+ | ( { callback ?: ( ) => Uint8Array ; buffer ?: Uint8Array } & OnReadOpts )
121+ | null
122+ | undefined ;
110123} ;
111124
112- import type {
113- IpcSocketConnectOpts ,
114- SocketConnectOpts ,
115- TcpSocketConnectOpts ,
116- AddressInfo ,
117- Socket as _Socket ,
118- } from 'node:net' ;
119-
120125export function BlockList ( ) : void {
121126 throw new Error ( 'BlockList is not implemented' ) ;
122127}
@@ -129,42 +134,43 @@ export function Server(): void {
129134 throw new Error ( 'Server is not implemented' ) ;
130135}
131136
132- export interface Socket extends _Socket {
133- timeout : number ;
134- connecting : boolean ;
135- _aborted : boolean ;
136- _hadError : boolean ;
137- _parent : null | Socket ;
138- _host : null | string ;
139- _peername : null | string ;
140- _getsockname ( ) :
137+ // @ts -expect-error TS2323 Redeclare error.
138+ export declare class Socket extends _Socket {
139+ public timeout : number ;
140+ public connecting : boolean ;
141+ public _aborted : boolean ;
142+ public _hadError : boolean ;
143+ public _parent : null | Socket ;
144+ public _host : null | string ;
145+ public _peername : null | string ;
146+ public _getsockname ( ) :
141147 | { }
142148 | {
143149 address ?: string ;
144150 port ?: number ;
145151 family ?: string ;
146152 } ;
147- [ kLastWriteQueueSize ] : number | null | undefined ;
148- [ kTimeout ] : Socket | null | undefined ;
149- [ kBuffer ] : null | boolean | Uint8Array ;
150- [ kBufferCb ] :
153+ public [ kLastWriteQueueSize ] : number | null | undefined ;
154+ public [ kTimeout ] : Socket | null | undefined ;
155+ public [ kBuffer ] : null | boolean | Uint8Array ;
156+ public [ kBufferCb ] :
151157 | null
152158 | undefined
153159 | ( ( len ?: number , buf ?: Buffer ) => boolean | Uint8Array ) ;
154- [ kBufferGen ] : null | ( ( ) => undefined | Uint8Array ) ;
155- [ kSocketInfo ] : null | {
160+ public [ kBufferGen ] : null | ( ( ) => undefined | Uint8Array ) ;
161+ public [ kSocketInfo ] : null | {
156162 address ?: string ;
157163 port ?: number ;
158164 family ?: number | string ;
159165 remoteAddress ?: Record < string , unknown > ;
160166 } ;
161- [ kBytesRead ] : number ;
162- [ kBytesWritten ] : number ;
163- _closeAfterHandlingError : boolean ;
164- _handle : null | {
167+ public [ kBytesRead ] : number ;
168+ public [ kBytesWritten ] : number ;
169+ public _closeAfterHandlingError : boolean ;
170+ public _handle : null | {
165171 writeQueueSize ?: number ;
166172 lastWriteQueueSize ?: number ;
167- reading ?: boolean ;
173+ reading ?: boolean | undefined ;
168174 bytesRead : number ;
169175 bytesWritten : number ;
170176 socket : ReturnType < typeof inner . connect > ;
@@ -177,29 +183,31 @@ export interface Socket extends _Socket {
177183 write ( data : string | ArrayBufferView ) : Promise < void > ;
178184 } ;
179185 } ;
180- _sockname ?: null | AddressInfo ;
181- _onTimeout ( ) : void ;
182- _unrefTimer ( ) : void ;
183- _writeGeneric (
186+ public _sockname ?: null | AddressInfo ;
187+ public _onTimeout ( ) : void ;
188+ public _unrefTimer ( ) : void ;
189+ public _writeGeneric (
184190 writev : boolean ,
185191 data : { chunk : string | ArrayBufferView ; encoding : string } [ ] ,
186192 encoding : string ,
187193 cb : ( err ?: Error ) => void
188194 ) : void ;
189- _final ( cb : ( err ?: Error ) => void ) : void ;
190- _read ( n : number ) : void ;
191- _reset ( ) : void ;
192- _getpeername ( ) : Record < string , unknown > ;
193- _writableState : null | unknown [ ] ;
194- }
195-
196- export interface SocketConstructor {
197- ( this : unknown , options ?: SocketOptions ) : Socket ;
198- new ( options ?: SocketOptions ) : Socket ;
199- prototype : Socket ;
195+ public _final ( cb : ( err ?: Error ) => void ) : void ;
196+ public _read ( n : number ) : void ;
197+ public _reset ( ) : void ;
198+ public _getpeername ( ) : Record < string , unknown > ;
199+ public _writableState : null | unknown [ ] ;
200+
201+ // Defined by TLSSocket
202+ public encrypted ?: boolean ;
203+ public _finishInit ( ) : void ;
204+
205+ public constructor ( options ?: SocketOptions ) ;
206+ public prototype : Socket ;
200207}
201208
202- export const Socket = function ( this : Socket , options ?: SocketOptions ) : Socket {
209+ // @ts -expect-error TS2323 Redeclare error.
210+ export function Socket ( this : Socket , options ?: SocketOptions ) : Socket {
203211 if ( ! ( this instanceof Socket ) ) {
204212 return new Socket ( options ) ;
205213 }
@@ -236,11 +244,7 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {
236244 options = { ...options } ;
237245 }
238246
239- if ( options . handle ) {
240- // We are not supporting the options.handle option for now. This is the
241- // option that allows users to pass in a handle to an existing socket.
242- throw new ERR_OPTION_NOT_IMPLEMENTED ( 'options.handle' ) ;
243- } else if ( options . fd !== undefined ) {
247+ if ( options . fd !== undefined ) {
244248 // We are not supporting the options.fd option for now. This is the option
245249 // that allows users to pass in a file descriptor to an existing socket.
246250 // Workers doesn't have file descriptors and does not use them in any way.
@@ -287,10 +291,15 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {
287291
288292 Duplex . call ( this , options ) ;
289293
294+ if ( options . handle ) {
295+ validateObject ( options . handle , 'options.handle' ) ;
296+ this . _handle = options . handle ;
297+ }
298+
290299 this . once ( 'end' , onReadableStreamEnd ) ;
291300
292301 if ( options . signal ) {
293- addClientAbortSignalOption ( this , options ) ;
302+ addClientAbortSignalOption ( this , options . signal ) ;
294303 }
295304
296305 const onread = options . onread ;
@@ -319,7 +328,7 @@ export const Socket = function (this: Socket, options?: SocketOptions): Socket {
319328 }
320329
321330 return this ;
322- } as SocketConstructor ;
331+ }
323332
324333Object . setPrototypeOf ( Socket . prototype , Duplex . prototype ) ;
325334Object . setPrototypeOf ( Socket , Duplex ) ;
@@ -691,26 +700,39 @@ Socket.prototype._destroy = function (
691700// ======================================================================================
692701// Connection
693702
694- Socket . prototype . connect = function ( this : Socket , ...args : unknown [ ] ) : Socket {
703+ // @ts -expect-error TS2322 Type inconsistencies between types/node
704+ Socket . prototype . connect = function (
705+ this : Socket ,
706+ ...args : unknown [ ]
707+ ) : Socket | undefined {
708+ let normalized ;
709+ // @ts -expect-error TS7015 Required not to overcomplicate types
710+ if ( Array . isArray ( args [ 0 ] ) && args [ 0 ] [ normalizedArgsSymbol ] ) {
711+ normalized = args [ 0 ] ;
712+ } else {
713+ normalized = _normalizeArgs ( args ) ;
714+ }
715+ const options = normalized [ 0 ] as TcpSocketConnectOpts & IpcSocketConnectOpts ;
716+ const cb = normalized [ 1 ] as ( ( err : Error | null ) => void ) | null ;
717+
695718 if ( this . connecting ) {
696719 throw new ERR_SOCKET_CONNECTING ( ) ;
697720 }
721+ if ( this . _aborted ) {
722+ if ( cb ) {
723+ cb ( new AbortError ( ) ) ;
724+ } else {
725+ throw new AbortError ( ) ;
726+ }
727+ return undefined ;
728+ }
698729 // TODO(later): In Node.js a Socket instance can be reset so that it can be reused.
699730 // We haven't yet implemented that here. We can consider doing so but it's not an
700731 // immediate priority. Implementing it correctly requires making sure the internal
701732 // state of the socket is correctly reset.
702733 if ( this . destroyed ) {
703734 throw new ERR_SOCKET_CLOSED ( ) ;
704735 }
705- let normalized ;
706- // @ts -expect-error TS7015 Required not to overcomplicate types
707- if ( Array . isArray ( args [ 0 ] ) && args [ 0 ] [ normalizedArgsSymbol ] ) {
708- normalized = args [ 0 ] ;
709- } else {
710- normalized = _normalizeArgs ( args ) ;
711- }
712- const options = normalized [ 0 ] as TcpSocketConnectOpts & IpcSocketConnectOpts ;
713- const cb = normalized [ 1 ] as VoidFunction | null ;
714736
715737 if ( cb !== null ) {
716738 this . once ( 'connect' , cb ) ;
@@ -942,7 +964,7 @@ function cleanupAfterDestroy(
942964 queueMicrotask ( ( ) => socket . emit ( 'close' , error != null ) ) ;
943965}
944966
945- function initializeConnection (
967+ export function initializeConnection (
946968 socket : Socket ,
947969 options : TcpSocketConnectOpts
948970) : void {
@@ -1000,12 +1022,12 @@ function initializeConnection(
10001022 const handle = inner . connect ( `${ host } :${ port } ` , {
10011023 allowHalfOpen : socket . allowHalfOpen ,
10021024 // A Node.js socket is always capable of being upgraded to the TLS socket.
1003- secureTransport : 'starttls' ,
1004- // We are not going to pass the high water mark here. The outer Node.js
1025+ secureTransport : socket . encrypted ? 'on' : 'starttls' ,
1026+ // We are not going to pass the high water- mark here. The outer Node.js
10051027 // stream will implement the appropriate backpressure for us.
10061028 } ) ;
10071029
1008- // Our version of the socket._handle is necessarily different than Node.js'.
1030+ // Our version of the socket._handle is necessarily different from Node.js'.
10091031 // It serves the same purpose but any code that may exist that is depending
10101032 // on `_handle` being a particular type (which it shouldn't be) will fail.
10111033 socket . _handle = {
@@ -1087,6 +1109,10 @@ function onConnectionOpened(this: Socket): void {
10871109 this . emit ( 'connect' ) ;
10881110 this . emit ( 'ready' ) ;
10891111
1112+ if ( this . encrypted ) {
1113+ // This is required for TLSSocket
1114+ this . _finishInit ( ) ;
1115+ }
10901116 if ( ! this . isPaused ( ) ) {
10911117 maybeStartReading ( this ) ;
10921118 }
@@ -1175,6 +1201,10 @@ async function startRead(socket: Socket): Promise<void> {
11751201 break ;
11761202 }
11771203 }
1204+ } catch ( _err ) {
1205+ // Ignore error, and don't log them.
1206+ // This is mostly triggered for invalid sockets with following errors:
1207+ // - "This ReadableStream belongs to an object that is closing."
11781208 } finally {
11791209 // Disable eslint to match Node.js behavior
11801210 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
@@ -1288,12 +1318,8 @@ export function _normalizeArgs(args: unknown[]): unknown[] {
12881318 return arr ;
12891319}
12901320
1291- function addClientAbortSignalOption (
1292- self : Socket ,
1293- options : { signal ?: AbortSignal }
1294- ) : void {
1295- validateAbortSignal ( options . signal , 'options.signal' ) ;
1296- const { signal } = options ;
1321+ function addClientAbortSignalOption ( self : Socket , signal : AbortSignal ) : void {
1322+ validateAbortSignal ( signal , 'options.signal' ) ;
12971323 let disposable : Disposable | undefined ;
12981324
12991325 function onAbort ( ) : void {
@@ -1303,7 +1329,7 @@ function addClientAbortSignalOption(
13031329 }
13041330
13051331 if ( signal . aborted ) {
1306- queueMicrotask ( onAbort ) ;
1332+ onAbort ( ) ;
13071333 } else {
13081334 queueMicrotask ( ( ) => {
13091335 disposable = addAbortListener ( signal , onAbort ) ;
0 commit comments