Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Lib/test/test_fnmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def test_fnmatchcase(self):
check('usr/bin', 'usr\\bin', False, fnmatchcase)
check('usr\\bin', 'usr\\bin', True, fnmatchcase)

@unittest.expectedFailureIfWindows('TODO: RUSTPYTHON')
def test_bytes(self):
self.check_match(b'test', b'te*')
self.check_match(b'test\xff', b'te*\xff')
Expand Down
16 changes: 0 additions & 16 deletions Lib/test/test_ntpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def test_splitdrive(self):
tester('ntpath.splitdrive("//?/UNC/server/share/dir")',
("//?/UNC/server/share", "/dir"))

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_splitdrive_invalid_paths(self):
splitdrive = ntpath.splitdrive
self.assertEqual(splitdrive('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
Expand Down Expand Up @@ -238,8 +236,6 @@ def test_splitroot(self):
tester('ntpath.splitroot(" :/foo")', (" :", "/", "foo"))
tester('ntpath.splitroot("/:/foo")', ("", "/", ":/foo"))

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_splitroot_invalid_paths(self):
splitroot = ntpath.splitroot
self.assertEqual(splitroot('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
Expand Down Expand Up @@ -268,8 +264,6 @@ def test_split(self):
tester('ntpath.split("c:/")', ('c:/', ''))
tester('ntpath.split("//conky/mountpoint/")', ('//conky/mountpoint/', ''))

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_split_invalid_paths(self):
split = ntpath.split
self.assertEqual(split('c:\\fo\x00o\\ba\x00r'),
Expand Down Expand Up @@ -392,8 +386,6 @@ def test_join(self):
tester("ntpath.join('D:a', './c:b')", 'D:a\\.\\c:b')
tester("ntpath.join('D:/a', './c:b')", 'D:\\a\\.\\c:b')

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_normcase(self):
normcase = ntpath.normcase
self.assertEqual(normcase(''), '')
Expand All @@ -409,8 +401,6 @@ def test_normcase(self):
self.assertEqual(normcase('\u03a9\u2126'.encode()),
expected.encode())

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_normcase_invalid_paths(self):
normcase = ntpath.normcase
self.assertEqual(normcase('abc\x00def'), 'abc\x00def')
Expand Down Expand Up @@ -468,8 +458,6 @@ def test_normpath(self):
tester("ntpath.normpath('\\\\')", '\\\\')
tester("ntpath.normpath('//?/UNC/server/share/..')", '\\\\?\\UNC\\server\\share\\')

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_normpath_invalid_paths(self):
normpath = ntpath.normpath
self.assertEqual(normpath('fo\x00o'), 'fo\x00o')
Expand Down Expand Up @@ -1130,8 +1118,6 @@ def test_abspath(self):
drive, _ = ntpath.splitdrive(cwd_dir)
tester('ntpath.abspath("/abc/")', drive + "\\abc")

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_abspath_invalid_paths(self):
abspath = ntpath.abspath
if sys.platform == 'win32':
Expand Down Expand Up @@ -1438,8 +1424,6 @@ def test_isfile_anonymous_pipe(self):
os.close(pr)
os.close(pw)

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skipIf(sys.platform != 'win32', "windows only")
def test_isfile_named_pipe(self):
import _winapi
Expand Down
4 changes: 0 additions & 4 deletions Lib/test/test_posixpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ def test_dirname(self):
self.assertEqual(posixpath.dirname(b"////foo"), b"////")
self.assertEqual(posixpath.dirname(b"//foo//bar"), b"//foo")

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_islink(self):
self.assertIs(posixpath.islink(TESTFN + "1"), False)
self.assertIs(posixpath.lexists(TESTFN + "2"), False)
Expand Down Expand Up @@ -236,8 +234,6 @@ def test_ismount_invalid_paths(self):
self.assertIs(posixpath.ismount('/\x00'), False)
self.assertIs(posixpath.ismount(b'/\x00'), False)

# TODO: RUSTPYTHON
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@os_helper.skip_unless_symlink
def test_ismount_symlinks(self):
# Symlinks are never mountpoints.
Expand Down
145 changes: 145 additions & 0 deletions crates/vm/src/stdlib/nt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,151 @@ pub(crate) mod module {
Ok(path.mode.process_path(buffer.to_os_string(), vm))
}

/// Implements CPython's _Py_skiproot logic for Windows paths
/// Returns (drive_size, root_size) where:
/// - drive_size: length of the drive/UNC portion
/// - root_size: length of the root separator (0 or 1)
fn skiproot(path: &[u16]) -> (usize, usize) {
let len = path.len();
if len == 0 {
return (0, 0);
}

const SEP: u16 = b'\\' as u16;
const ALTSEP: u16 = b'/' as u16;
const COLON: u16 = b':' as u16;

let is_sep = |c: u16| c == SEP || c == ALTSEP;
let get = |i: usize| path.get(i).copied().unwrap_or(0);

if is_sep(get(0)) {
if is_sep(get(1)) {
// UNC or device path: \\server\share or \\?\device
// Check for \\?\UNC\server\share
let idx = if len >= 8
&& get(2) == b'?' as u16
&& is_sep(get(3))
&& (get(4) == b'U' as u16 || get(4) == b'u' as u16)
&& (get(5) == b'N' as u16 || get(5) == b'n' as u16)
&& (get(6) == b'C' as u16 || get(6) == b'c' as u16)
&& is_sep(get(7))
{
8
} else {
2
};

// Find the end of server name
let mut i = idx;
while i < len && !is_sep(get(i)) {
i += 1;
}

if i >= len {
// No share part: \\server
return (i, 0);
}

// Skip separator and find end of share name
i += 1;
while i < len && !is_sep(get(i)) {
i += 1;
}

// drive = \\server\share, root = \ (if present)
if i >= len { (i, 0) } else { (i, 1) }
} else {
// Relative path with root: \Windows
(0, 1)
}
} else if len >= 2 && get(1) == COLON {
// Drive letter path
if len >= 3 && is_sep(get(2)) {
// Absolute: X:\Windows
(2, 1)
} else {
// Relative with drive: X:Windows
(2, 0)
}
} else {
// Relative path: Windows
(0, 0)
}
}

#[pyfunction]
fn _path_splitroot_ex(path: crate::PyObjectRef, vm: &VirtualMachine) -> PyResult<PyTupleRef> {
use crate::builtins::{PyBytes, PyStr};
use rustpython_common::wtf8::Wtf8Buf;

// Handle path-like objects via os.fspath, but without null check (nonstrict=True in CPython)
let path = if let Some(fspath) = vm.get_method(path.clone(), identifier!(vm, __fspath__)) {
fspath?.call((), vm)?
} else {
path
};

// Convert to wide string, validating UTF-8 for bytes input
let (wide, is_bytes): (Vec<u16>, bool) = if let Some(s) = path.downcast_ref::<PyStr>() {
// Use encode_wide which handles WTF-8 (including surrogates)
let wide: Vec<u16> = s.as_wtf8().encode_wide().collect();
(wide, false)
} else if let Some(b) = path.downcast_ref::<PyBytes>() {
// On Windows, bytes must be valid UTF-8 - this raises UnicodeDecodeError if not
let s = std::str::from_utf8(b.as_bytes()).map_err(|e| {
vm.new_exception_msg(
vm.ctx.exceptions.unicode_decode_error.to_owned(),
format!(
"'utf-8' codec can't decode byte {:#x} in position {}: invalid start byte",
b.as_bytes().get(e.valid_up_to()).copied().unwrap_or(0),
e.valid_up_to()
),
)
})?;
let wide: Vec<u16> = s.encode_utf16().collect();
(wide, true)
} else {
return Err(vm.new_type_error(format!(
"expected str or bytes, not {}",
path.class().name()
)));
};

// Normalize slashes for parsing
let normalized: Vec<u16> = wide
.iter()
.map(|&c| if c == b'/' as u16 { b'\\' as u16 } else { c })
.collect();

let (drv_size, root_size) = skiproot(&normalized);

// Return as bytes if input was bytes, preserving the original content
if is_bytes {
// Convert UTF-16 back to UTF-8 for bytes output
let drv = String::from_utf16(&wide[..drv_size])
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
let root = String::from_utf16(&wide[drv_size..drv_size + root_size])
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
let tail = String::from_utf16(&wide[drv_size + root_size..])
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
Ok(vm.ctx.new_tuple(vec![
vm.ctx.new_bytes(drv.into_bytes()).into(),
vm.ctx.new_bytes(root.into_bytes()).into(),
vm.ctx.new_bytes(tail.into_bytes()).into(),
]))
} else {
// For str output, use WTF-8 to handle surrogates
let drv = Wtf8Buf::from_wide(&wide[..drv_size]);
let root = Wtf8Buf::from_wide(&wide[drv_size..drv_size + root_size]);
let tail = Wtf8Buf::from_wide(&wide[drv_size + root_size..]);
Ok(vm.ctx.new_tuple(vec![
vm.ctx.new_str(drv).into(),
vm.ctx.new_str(root).into(),
vm.ctx.new_str(tail).into(),
]))
}
}

#[pyfunction]
fn _path_splitroot(path: OsPath, vm: &VirtualMachine) -> PyResult<(String, String)> {
let orig: Vec<_> = path.path.to_wide();
Expand Down
129 changes: 129 additions & 0 deletions crates/vm/src/stdlib/winapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,4 +540,133 @@ mod _winapi {
windows_sys::Win32::System::Threading::ReleaseMutex(handle as _)
})
}

// LOCALE_NAME_INVARIANT is an empty string in Windows API
#[pyattr]
const LOCALE_NAME_INVARIANT: &str = "";

/// LCMapStringEx - Map a string to another string using locale-specific rules
/// This is used by ntpath.normcase() for proper Windows case conversion
#[pyfunction]
fn LCMapStringEx(
locale: PyStrRef,
flags: u32,
src: PyStrRef,
vm: &VirtualMachine,
) -> PyResult<PyStrRef> {
use rustpython_common::wtf8::Wtf8Buf;
use windows_sys::Win32::Globalization::{
LCMAP_BYTEREV, LCMAP_HASH, LCMAP_SORTHANDLE, LCMAP_SORTKEY,
LCMapStringEx as WinLCMapStringEx,
};

// Reject unsupported flags (same as CPython)
if flags & (LCMAP_SORTHANDLE | LCMAP_HASH | LCMAP_BYTEREV | LCMAP_SORTKEY) != 0 {
return Err(vm.new_value_error("unsupported flags"));
}

// Use encode_wide() which properly handles WTF-8 (including surrogates)
let locale_wide: Vec<u16> = locale
.as_wtf8()
.encode_wide()
.chain(std::iter::once(0))
.collect();
let src_wide: Vec<u16> = src.as_wtf8().encode_wide().collect();

if src_wide.len() > i32::MAX as usize {
return Err(vm.new_overflow_error("input string is too long".to_string()));
}

// First call to get required buffer size
let dest_size = unsafe {
WinLCMapStringEx(
locale_wide.as_ptr(),
flags,
src_wide.as_ptr(),
src_wide.len() as i32,
null_mut(),
0,
null(),
null(),
0,
)
};

if dest_size <= 0 {
return Err(vm.new_last_os_error());
}

// Second call to perform the mapping
let mut dest = vec![0u16; dest_size as usize];
let nmapped = unsafe {
WinLCMapStringEx(
locale_wide.as_ptr(),
flags,
src_wide.as_ptr(),
src_wide.len() as i32,
dest.as_mut_ptr(),
dest_size,
null(),
null(),
0,
)
};

if nmapped <= 0 {
return Err(vm.new_last_os_error());
}

dest.truncate(nmapped as usize);

// Convert UTF-16 back to WTF-8 (handles surrogates properly)
let result = Wtf8Buf::from_wide(&dest);
Ok(vm.ctx.new_str(result))
}

#[derive(FromArgs)]
struct CreateNamedPipeArgs {
#[pyarg(positional)]
name: PyStrRef,
#[pyarg(positional)]
open_mode: u32,
#[pyarg(positional)]
pipe_mode: u32,
#[pyarg(positional)]
max_instances: u32,
#[pyarg(positional)]
out_buffer_size: u32,
#[pyarg(positional)]
in_buffer_size: u32,
#[pyarg(positional)]
default_timeout: u32,
#[pyarg(positional)]
_security_attributes: PyObjectRef, // Ignored, can be None
}

/// CreateNamedPipe - Create a named pipe
#[pyfunction]
fn CreateNamedPipe(args: CreateNamedPipeArgs, vm: &VirtualMachine) -> PyResult<WinHandle> {
use windows_sys::Win32::System::Pipes::CreateNamedPipeW;

let name_wide = args.name.as_str().to_wide_with_nul();

let handle = unsafe {
CreateNamedPipeW(
name_wide.as_ptr(),
args.open_mode,
args.pipe_mode,
args.max_instances,
args.out_buffer_size,
args.in_buffer_size,
args.default_timeout,
null(), // security_attributes - NULL for now
)
};

if handle == INVALID_HANDLE_VALUE {
return Err(vm.new_last_os_error());
}

Ok(WinHandle(handle))
}
}
Loading