diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 5a75971be6..1816a63559 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -3649,8 +3649,6 @@ def test_shell_string(self): with p: self.assertIn(b"physalis", p.stdout.read()) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_encodings(self): # Run command through the shell (string) for enc in ['ansi', 'oem']: @@ -3869,28 +3867,20 @@ def with_spaces(self, *args, **kwargs): "2 [%r, 'ab cd']" % self.fname ) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd"), shell=1) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_shell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_noshell_string_with_spaces(self): # call() function with string argument with spaces on Windows self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, "ab cd")) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_noshell_sequence_with_spaces(self): # call() function with sequence argument with spaces on Windows self.with_spaces([sys.executable, self.fname, "ab cd"]) diff --git a/crates/vm/src/stdlib/codecs.rs b/crates/vm/src/stdlib/codecs.rs index 46b1608f24..dee5b17b82 100644 --- a/crates/vm/src/stdlib/codecs.rs +++ b/crates/vm/src/stdlib/codecs.rs @@ -225,14 +225,410 @@ mod _codecs { }}; } + #[cfg(windows)] + #[derive(FromArgs)] + struct MbcsEncodeArgs { + #[pyarg(positional)] + s: PyStrRef, + #[pyarg(positional, optional)] + errors: Option, + } + + #[cfg(windows)] + #[pyfunction] + fn mbcs_encode(args: MbcsEncodeArgs, vm: &VirtualMachine) -> PyResult<(Vec, usize)> { + use std::os::windows::ffi::OsStrExt; + use windows_sys::Win32::Globalization::{ + CP_ACP, WC_NO_BEST_FIT_CHARS, WideCharToMultiByte, + }; + + let errors = args.errors.as_ref().map(|s| s.as_str()).unwrap_or("strict"); + let s = match args.s.to_str() { + Some(s) => s, + None => { + // String contains surrogates - not encodable with mbcs + return Err(vm.new_unicode_encode_error( + "'mbcs' codec can't encode character: surrogates not allowed".to_string(), + )); + } + }; + let char_len = args.s.char_len(); + + if s.is_empty() { + return Ok((Vec::new(), char_len)); + } + + // Convert UTF-8 string to UTF-16 + let wide: Vec = std::ffi::OsStr::new(s).encode_wide().collect(); + + // Get the required buffer size + let size = unsafe { + WideCharToMultiByte( + CP_ACP, + WC_NO_BEST_FIT_CHARS, + wide.as_ptr(), + wide.len() as i32, + std::ptr::null_mut(), + 0, + std::ptr::null(), + std::ptr::null_mut(), + ) + }; + + if size == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("mbcs_encode failed: {}", err))); + } + + let mut buffer = vec![0u8; size as usize]; + let mut used_default_char: i32 = 0; + + let result = unsafe { + WideCharToMultiByte( + CP_ACP, + WC_NO_BEST_FIT_CHARS, + wide.as_ptr(), + wide.len() as i32, + buffer.as_mut_ptr().cast(), + size, + std::ptr::null(), + if errors == "strict" { + &mut used_default_char + } else { + std::ptr::null_mut() + }, + ) + }; + + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("mbcs_encode failed: {err}"))); + } + + if errors == "strict" && used_default_char != 0 { + return Err(vm.new_unicode_encode_error( + "'mbcs' codec can't encode characters: invalid character", + )); + } + + buffer.truncate(result as usize); + Ok((buffer, char_len)) + } + + #[cfg(not(windows))] #[pyfunction] fn mbcs_encode(args: FuncArgs, vm: &VirtualMachine) -> PyResult { delegate_pycodecs!(mbcs_encode, args, vm) } + + #[cfg(windows)] + #[derive(FromArgs)] + struct MbcsDecodeArgs { + #[pyarg(positional)] + data: ArgBytesLike, + #[pyarg(positional, optional)] + errors: Option, + #[pyarg(positional, default = false)] + #[allow(dead_code)] + r#final: bool, + } + + #[cfg(windows)] + #[pyfunction] + fn mbcs_decode(args: MbcsDecodeArgs, vm: &VirtualMachine) -> PyResult<(String, usize)> { + use windows_sys::Win32::Globalization::{ + CP_ACP, MB_ERR_INVALID_CHARS, MultiByteToWideChar, + }; + + let _errors = args.errors.as_ref().map(|s| s.as_str()).unwrap_or("strict"); + let data = args.data.borrow_buf(); + let len = data.len(); + + if data.is_empty() { + return Ok((String::new(), 0)); + } + + // Get the required buffer size for UTF-16 + let size = unsafe { + MultiByteToWideChar( + CP_ACP, + MB_ERR_INVALID_CHARS, + data.as_ptr().cast(), + len as i32, + std::ptr::null_mut(), + 0, + ) + }; + + if size == 0 { + // Try without MB_ERR_INVALID_CHARS for non-strict mode (replacement behavior) + let size = unsafe { + MultiByteToWideChar( + CP_ACP, + 0, + data.as_ptr().cast(), + len as i32, + std::ptr::null_mut(), + 0, + ) + }; + if size == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("mbcs_decode failed: {}", err))); + } + + let mut buffer = vec![0u16; size as usize]; + let result = unsafe { + MultiByteToWideChar( + CP_ACP, + 0, + data.as_ptr().cast(), + len as i32, + buffer.as_mut_ptr(), + size, + ) + }; + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("mbcs_decode failed: {}", err))); + } + buffer.truncate(result as usize); + let s = String::from_utf16(&buffer) + .map_err(|e| vm.new_unicode_decode_error(format!("mbcs_decode failed: {}", e)))?; + return Ok((s, len)); + } + + // Strict mode succeeded - no invalid characters + let mut buffer = vec![0u16; size as usize]; + let result = unsafe { + MultiByteToWideChar( + CP_ACP, + MB_ERR_INVALID_CHARS, + data.as_ptr().cast(), + len as i32, + buffer.as_mut_ptr(), + size, + ) + }; + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("mbcs_decode failed: {}", err))); + } + buffer.truncate(result as usize); + let s = String::from_utf16(&buffer) + .map_err(|e| vm.new_unicode_decode_error(format!("mbcs_decode failed: {}", e)))?; + + Ok((s, len)) + } + + #[cfg(not(windows))] #[pyfunction] fn mbcs_decode(args: FuncArgs, vm: &VirtualMachine) -> PyResult { delegate_pycodecs!(mbcs_decode, args, vm) } + + #[cfg(windows)] + #[derive(FromArgs)] + struct OemEncodeArgs { + #[pyarg(positional)] + s: PyStrRef, + #[pyarg(positional, optional)] + errors: Option, + } + + #[cfg(windows)] + #[pyfunction] + fn oem_encode(args: OemEncodeArgs, vm: &VirtualMachine) -> PyResult<(Vec, usize)> { + use std::os::windows::ffi::OsStrExt; + use windows_sys::Win32::Globalization::{ + CP_OEMCP, WC_NO_BEST_FIT_CHARS, WideCharToMultiByte, + }; + + let errors = args.errors.as_ref().map(|s| s.as_str()).unwrap_or("strict"); + let s = match args.s.to_str() { + Some(s) => s, + None => { + // String contains surrogates - not encodable with oem + return Err(vm.new_unicode_encode_error( + "'oem' codec can't encode character: surrogates not allowed".to_string(), + )); + } + }; + let char_len = args.s.char_len(); + + if s.is_empty() { + return Ok((Vec::new(), char_len)); + } + + // Convert UTF-8 string to UTF-16 + let wide: Vec = std::ffi::OsStr::new(s).encode_wide().collect(); + + // Get the required buffer size + let size = unsafe { + WideCharToMultiByte( + CP_OEMCP, + WC_NO_BEST_FIT_CHARS, + wide.as_ptr(), + wide.len() as i32, + std::ptr::null_mut(), + 0, + std::ptr::null(), + std::ptr::null_mut(), + ) + }; + + if size == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("oem_encode failed: {}", err))); + } + + let mut buffer = vec![0u8; size as usize]; + let mut used_default_char: i32 = 0; + + let result = unsafe { + WideCharToMultiByte( + CP_OEMCP, + WC_NO_BEST_FIT_CHARS, + wide.as_ptr(), + wide.len() as i32, + buffer.as_mut_ptr().cast(), + size, + std::ptr::null(), + if errors == "strict" { + &mut used_default_char + } else { + std::ptr::null_mut() + }, + ) + }; + + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("oem_encode failed: {err}"))); + } + + if errors == "strict" && used_default_char != 0 { + return Err(vm.new_unicode_encode_error( + "'oem' codec can't encode characters: invalid character", + )); + } + + buffer.truncate(result as usize); + Ok((buffer, char_len)) + } + + #[cfg(not(windows))] + #[pyfunction] + fn oem_encode(args: FuncArgs, vm: &VirtualMachine) -> PyResult { + delegate_pycodecs!(oem_encode, args, vm) + } + + #[cfg(windows)] + #[derive(FromArgs)] + struct OemDecodeArgs { + #[pyarg(positional)] + data: ArgBytesLike, + #[pyarg(positional, optional)] + errors: Option, + #[pyarg(positional, default = false)] + #[allow(dead_code)] + r#final: bool, + } + + #[cfg(windows)] + #[pyfunction] + fn oem_decode(args: OemDecodeArgs, vm: &VirtualMachine) -> PyResult<(String, usize)> { + use windows_sys::Win32::Globalization::{ + CP_OEMCP, MB_ERR_INVALID_CHARS, MultiByteToWideChar, + }; + + let _errors = args.errors.as_ref().map(|s| s.as_str()).unwrap_or("strict"); + let data = args.data.borrow_buf(); + let len = data.len(); + + if data.is_empty() { + return Ok((String::new(), 0)); + } + + // Get the required buffer size for UTF-16 + let size = unsafe { + MultiByteToWideChar( + CP_OEMCP, + MB_ERR_INVALID_CHARS, + data.as_ptr().cast(), + len as i32, + std::ptr::null_mut(), + 0, + ) + }; + + if size == 0 { + // Try without MB_ERR_INVALID_CHARS for non-strict mode (replacement behavior) + let size = unsafe { + MultiByteToWideChar( + CP_OEMCP, + 0, + data.as_ptr().cast(), + len as i32, + std::ptr::null_mut(), + 0, + ) + }; + if size == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("oem_decode failed: {}", err))); + } + + let mut buffer = vec![0u16; size as usize]; + let result = unsafe { + MultiByteToWideChar( + CP_OEMCP, + 0, + data.as_ptr().cast(), + len as i32, + buffer.as_mut_ptr(), + size, + ) + }; + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("oem_decode failed: {}", err))); + } + buffer.truncate(result as usize); + let s = String::from_utf16(&buffer) + .map_err(|e| vm.new_unicode_decode_error(format!("oem_decode failed: {}", e)))?; + return Ok((s, len)); + } + + // Strict mode succeeded - no invalid characters + let mut buffer = vec![0u16; size as usize]; + let result = unsafe { + MultiByteToWideChar( + CP_OEMCP, + MB_ERR_INVALID_CHARS, + data.as_ptr().cast(), + len as i32, + buffer.as_mut_ptr(), + size, + ) + }; + if result == 0 { + let err = std::io::Error::last_os_error(); + return Err(vm.new_os_error(format!("oem_decode failed: {}", err))); + } + buffer.truncate(result as usize); + let s = String::from_utf16(&buffer) + .map_err(|e| vm.new_unicode_decode_error(format!("oem_decode failed: {}", e)))?; + + Ok((s, len)) + } + + #[cfg(not(windows))] + #[pyfunction] + fn oem_decode(args: FuncArgs, vm: &VirtualMachine) -> PyResult { + delegate_pycodecs!(oem_decode, args, vm) + } + #[pyfunction] fn readbuffer_encode(args: FuncArgs, vm: &VirtualMachine) -> PyResult { delegate_pycodecs!(readbuffer_encode, args, vm)