Skip to content

Commit dd459ed

Browse files
committed
nt.skiproot, winapi.LCMapStringEx
1 parent 4828fb3 commit dd459ed

File tree

4 files changed

+274
-20
lines changed

4 files changed

+274
-20
lines changed

Lib/test/test_ntpath.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ def test_splitdrive(self):
130130
tester('ntpath.splitdrive("//?/UNC/server/share/dir")',
131131
("//?/UNC/server/share", "/dir"))
132132

133-
# TODO: RUSTPYTHON
134-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
135133
def test_splitdrive_invalid_paths(self):
136134
splitdrive = ntpath.splitdrive
137135
self.assertEqual(splitdrive('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
@@ -238,8 +236,6 @@ def test_splitroot(self):
238236
tester('ntpath.splitroot(" :/foo")', (" :", "/", "foo"))
239237
tester('ntpath.splitroot("/:/foo")', ("", "/", ":/foo"))
240238

241-
# TODO: RUSTPYTHON
242-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
243239
def test_splitroot_invalid_paths(self):
244240
splitroot = ntpath.splitroot
245241
self.assertEqual(splitroot('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
@@ -268,8 +264,6 @@ def test_split(self):
268264
tester('ntpath.split("c:/")', ('c:/', ''))
269265
tester('ntpath.split("//conky/mountpoint/")', ('//conky/mountpoint/', ''))
270266

271-
# TODO: RUSTPYTHON
272-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
273267
def test_split_invalid_paths(self):
274268
split = ntpath.split
275269
self.assertEqual(split('c:\\fo\x00o\\ba\x00r'),
@@ -392,8 +386,6 @@ def test_join(self):
392386
tester("ntpath.join('D:a', './c:b')", 'D:a\\.\\c:b')
393387
tester("ntpath.join('D:/a', './c:b')", 'D:\\a\\.\\c:b')
394388

395-
# TODO: RUSTPYTHON
396-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
397389
def test_normcase(self):
398390
normcase = ntpath.normcase
399391
self.assertEqual(normcase(''), '')
@@ -409,8 +401,6 @@ def test_normcase(self):
409401
self.assertEqual(normcase('\u03a9\u2126'.encode()),
410402
expected.encode())
411403

412-
# TODO: RUSTPYTHON
413-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
414404
def test_normcase_invalid_paths(self):
415405
normcase = ntpath.normcase
416406
self.assertEqual(normcase('abc\x00def'), 'abc\x00def')
@@ -468,8 +458,6 @@ def test_normpath(self):
468458
tester("ntpath.normpath('\\\\')", '\\\\')
469459
tester("ntpath.normpath('//?/UNC/server/share/..')", '\\\\?\\UNC\\server\\share\\')
470460

471-
# TODO: RUSTPYTHON
472-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
473461
def test_normpath_invalid_paths(self):
474462
normpath = ntpath.normpath
475463
self.assertEqual(normpath('fo\x00o'), 'fo\x00o')
@@ -1130,8 +1118,6 @@ def test_abspath(self):
11301118
drive, _ = ntpath.splitdrive(cwd_dir)
11311119
tester('ntpath.abspath("/abc/")', drive + "\\abc")
11321120

1133-
# TODO: RUSTPYTHON
1134-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
11351121
def test_abspath_invalid_paths(self):
11361122
abspath = ntpath.abspath
11371123
if sys.platform == 'win32':
@@ -1438,8 +1424,6 @@ def test_isfile_anonymous_pipe(self):
14381424
os.close(pr)
14391425
os.close(pw)
14401426

1441-
# TODO: RUSTPYTHON
1442-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
14431427
@unittest.skipIf(sys.platform != 'win32', "windows only")
14441428
def test_isfile_named_pipe(self):
14451429
import _winapi

Lib/test/test_posixpath.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@ def test_dirname(self):
189189
self.assertEqual(posixpath.dirname(b"////foo"), b"////")
190190
self.assertEqual(posixpath.dirname(b"//foo//bar"), b"//foo")
191191

192-
# TODO: RUSTPYTHON
193-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
194192
def test_islink(self):
195193
self.assertIs(posixpath.islink(TESTFN + "1"), False)
196194
self.assertIs(posixpath.lexists(TESTFN + "2"), False)
@@ -236,8 +234,6 @@ def test_ismount_invalid_paths(self):
236234
self.assertIs(posixpath.ismount('/\x00'), False)
237235
self.assertIs(posixpath.ismount(b'/\x00'), False)
238236

239-
# TODO: RUSTPYTHON
240-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
241237
@os_helper.skip_unless_symlink
242238
def test_ismount_symlinks(self):
243239
# Symlinks are never mountpoints.

crates/vm/src/stdlib/nt.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,151 @@ pub(crate) mod module {
570570
Ok(path.mode.process_path(buffer.to_os_string(), vm))
571571
}
572572

573+
/// Implements CPython's _Py_skiproot logic for Windows paths
574+
/// Returns (drive_size, root_size) where:
575+
/// - drive_size: length of the drive/UNC portion
576+
/// - root_size: length of the root separator (0 or 1)
577+
fn skiproot(path: &[u16]) -> (usize, usize) {
578+
let len = path.len();
579+
if len == 0 {
580+
return (0, 0);
581+
}
582+
583+
const SEP: u16 = b'\\' as u16;
584+
const ALTSEP: u16 = b'/' as u16;
585+
const COLON: u16 = b':' as u16;
586+
587+
let is_sep = |c: u16| c == SEP || c == ALTSEP;
588+
let get = |i: usize| path.get(i).copied().unwrap_or(0);
589+
590+
if is_sep(get(0)) {
591+
if is_sep(get(1)) {
592+
// UNC or device path: \\server\share or \\?\device
593+
// Check for \\?\UNC\server\share
594+
let idx = if len >= 8
595+
&& get(2) == b'?' as u16
596+
&& is_sep(get(3))
597+
&& (get(4) == b'U' as u16 || get(4) == b'u' as u16)
598+
&& (get(5) == b'N' as u16 || get(5) == b'n' as u16)
599+
&& (get(6) == b'C' as u16 || get(6) == b'c' as u16)
600+
&& is_sep(get(7))
601+
{
602+
8
603+
} else {
604+
2
605+
};
606+
607+
// Find the end of server name
608+
let mut i = idx;
609+
while i < len && !is_sep(get(i)) {
610+
i += 1;
611+
}
612+
613+
if i >= len {
614+
// No share part: \\server
615+
return (i, 0);
616+
}
617+
618+
// Skip separator and find end of share name
619+
i += 1;
620+
while i < len && !is_sep(get(i)) {
621+
i += 1;
622+
}
623+
624+
// drive = \\server\share, root = \ (if present)
625+
if i >= len { (i, 0) } else { (i, 1) }
626+
} else {
627+
// Relative path with root: \Windows
628+
(0, 1)
629+
}
630+
} else if len >= 2 && get(1) == COLON {
631+
// Drive letter path
632+
if len >= 3 && is_sep(get(2)) {
633+
// Absolute: X:\Windows
634+
(2, 1)
635+
} else {
636+
// Relative with drive: X:Windows
637+
(2, 0)
638+
}
639+
} else {
640+
// Relative path: Windows
641+
(0, 0)
642+
}
643+
}
644+
645+
#[pyfunction]
646+
fn _path_splitroot_ex(path: crate::PyObjectRef, vm: &VirtualMachine) -> PyResult<PyTupleRef> {
647+
use crate::builtins::{PyBytes, PyStr};
648+
use rustpython_common::wtf8::Wtf8Buf;
649+
650+
// Handle path-like objects via os.fspath, but without null check (nonstrict=True in CPython)
651+
let path = if let Some(fspath) = vm.get_method(path.clone(), identifier!(vm, __fspath__)) {
652+
fspath?.call((), vm)?
653+
} else {
654+
path
655+
};
656+
657+
// Convert to wide string, validating UTF-8 for bytes input
658+
let (wide, is_bytes): (Vec<u16>, bool) = if let Some(s) = path.downcast_ref::<PyStr>() {
659+
// Use encode_wide which handles WTF-8 (including surrogates)
660+
let wide: Vec<u16> = s.as_wtf8().encode_wide().collect();
661+
(wide, false)
662+
} else if let Some(b) = path.downcast_ref::<PyBytes>() {
663+
// On Windows, bytes must be valid UTF-8 - this raises UnicodeDecodeError if not
664+
let s = std::str::from_utf8(b.as_bytes()).map_err(|e| {
665+
vm.new_exception_msg(
666+
vm.ctx.exceptions.unicode_decode_error.to_owned(),
667+
format!(
668+
"'utf-8' codec can't decode byte {:#x} in position {}: invalid start byte",
669+
b.as_bytes().get(e.valid_up_to()).copied().unwrap_or(0),
670+
e.valid_up_to()
671+
),
672+
)
673+
})?;
674+
let wide: Vec<u16> = s.encode_utf16().collect();
675+
(wide, true)
676+
} else {
677+
return Err(vm.new_type_error(format!(
678+
"expected str or bytes, not {}",
679+
path.class().name()
680+
)));
681+
};
682+
683+
// Normalize slashes for parsing
684+
let normalized: Vec<u16> = wide
685+
.iter()
686+
.map(|&c| if c == b'/' as u16 { b'\\' as u16 } else { c })
687+
.collect();
688+
689+
let (drv_size, root_size) = skiproot(&normalized);
690+
691+
// Return as bytes if input was bytes, preserving the original content
692+
if is_bytes {
693+
// Convert UTF-16 back to UTF-8 for bytes output
694+
let drv = String::from_utf16(&wide[..drv_size])
695+
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
696+
let root = String::from_utf16(&wide[drv_size..drv_size + root_size])
697+
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
698+
let tail = String::from_utf16(&wide[drv_size + root_size..])
699+
.map_err(|e| vm.new_unicode_decode_error(e.to_string()))?;
700+
Ok(vm.ctx.new_tuple(vec![
701+
vm.ctx.new_bytes(drv.into_bytes()).into(),
702+
vm.ctx.new_bytes(root.into_bytes()).into(),
703+
vm.ctx.new_bytes(tail.into_bytes()).into(),
704+
]))
705+
} else {
706+
// For str output, use WTF-8 to handle surrogates
707+
let drv = Wtf8Buf::from_wide(&wide[..drv_size]);
708+
let root = Wtf8Buf::from_wide(&wide[drv_size..drv_size + root_size]);
709+
let tail = Wtf8Buf::from_wide(&wide[drv_size + root_size..]);
710+
Ok(vm.ctx.new_tuple(vec![
711+
vm.ctx.new_str(drv).into(),
712+
vm.ctx.new_str(root).into(),
713+
vm.ctx.new_str(tail).into(),
714+
]))
715+
}
716+
}
717+
573718
#[pyfunction]
574719
fn _path_splitroot(path: OsPath, vm: &VirtualMachine) -> PyResult<(String, String)> {
575720
let orig: Vec<_> = path.path.to_wide();

crates/vm/src/stdlib/winapi.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,4 +540,133 @@ mod _winapi {
540540
windows_sys::Win32::System::Threading::ReleaseMutex(handle as _)
541541
})
542542
}
543+
544+
// LOCALE_NAME_INVARIANT is an empty string in Windows API
545+
#[pyattr]
546+
const LOCALE_NAME_INVARIANT: &str = "";
547+
548+
/// LCMapStringEx - Map a string to another string using locale-specific rules
549+
/// This is used by ntpath.normcase() for proper Windows case conversion
550+
#[pyfunction]
551+
fn LCMapStringEx(
552+
locale: PyStrRef,
553+
flags: u32,
554+
src: PyStrRef,
555+
vm: &VirtualMachine,
556+
) -> PyResult<PyStrRef> {
557+
use rustpython_common::wtf8::Wtf8Buf;
558+
use windows_sys::Win32::Globalization::{
559+
LCMAP_BYTEREV, LCMAP_HASH, LCMAP_SORTHANDLE, LCMAP_SORTKEY,
560+
LCMapStringEx as WinLCMapStringEx,
561+
};
562+
563+
// Reject unsupported flags (same as CPython)
564+
if flags & (LCMAP_SORTHANDLE | LCMAP_HASH | LCMAP_BYTEREV | LCMAP_SORTKEY) != 0 {
565+
return Err(vm.new_value_error("unsupported flags"));
566+
}
567+
568+
// Use encode_wide() which properly handles WTF-8 (including surrogates)
569+
let locale_wide: Vec<u16> = locale
570+
.as_wtf8()
571+
.encode_wide()
572+
.chain(std::iter::once(0))
573+
.collect();
574+
let src_wide: Vec<u16> = src.as_wtf8().encode_wide().collect();
575+
576+
if src_wide.len() > i32::MAX as usize {
577+
return Err(vm.new_overflow_error("input string is too long".to_string()));
578+
}
579+
580+
// First call to get required buffer size
581+
let dest_size = unsafe {
582+
WinLCMapStringEx(
583+
locale_wide.as_ptr(),
584+
flags,
585+
src_wide.as_ptr(),
586+
src_wide.len() as i32,
587+
null_mut(),
588+
0,
589+
null(),
590+
null(),
591+
0,
592+
)
593+
};
594+
595+
if dest_size <= 0 {
596+
return Err(vm.new_last_os_error());
597+
}
598+
599+
// Second call to perform the mapping
600+
let mut dest = vec![0u16; dest_size as usize];
601+
let nmapped = unsafe {
602+
WinLCMapStringEx(
603+
locale_wide.as_ptr(),
604+
flags,
605+
src_wide.as_ptr(),
606+
src_wide.len() as i32,
607+
dest.as_mut_ptr(),
608+
dest_size,
609+
null(),
610+
null(),
611+
0,
612+
)
613+
};
614+
615+
if nmapped <= 0 {
616+
return Err(vm.new_last_os_error());
617+
}
618+
619+
dest.truncate(nmapped as usize);
620+
621+
// Convert UTF-16 back to WTF-8 (handles surrogates properly)
622+
let result = Wtf8Buf::from_wide(&dest);
623+
Ok(vm.ctx.new_str(result))
624+
}
625+
626+
#[derive(FromArgs)]
627+
struct CreateNamedPipeArgs {
628+
#[pyarg(positional)]
629+
name: PyStrRef,
630+
#[pyarg(positional)]
631+
open_mode: u32,
632+
#[pyarg(positional)]
633+
pipe_mode: u32,
634+
#[pyarg(positional)]
635+
max_instances: u32,
636+
#[pyarg(positional)]
637+
out_buffer_size: u32,
638+
#[pyarg(positional)]
639+
in_buffer_size: u32,
640+
#[pyarg(positional)]
641+
default_timeout: u32,
642+
#[pyarg(positional)]
643+
_security_attributes: PyObjectRef, // Ignored, can be None
644+
}
645+
646+
/// CreateNamedPipe - Create a named pipe
647+
#[pyfunction]
648+
fn CreateNamedPipe(args: CreateNamedPipeArgs, vm: &VirtualMachine) -> PyResult<WinHandle> {
649+
use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
650+
651+
let name_wide = args.name.as_str().to_wide_with_nul();
652+
653+
let handle = unsafe {
654+
CreateNamedPipeW(
655+
name_wide.as_ptr(),
656+
args.open_mode,
657+
args.pipe_mode,
658+
args.max_instances,
659+
args.out_buffer_size,
660+
args.in_buffer_size,
661+
args.default_timeout,
662+
null(), // security_attributes - NULL for now
663+
)
664+
};
665+
666+
if handle == INVALID_HANDLE_VALUE {
667+
return Err(vm.new_last_os_error());
668+
}
669+
670+
Ok(WinHandle(handle))
671+
}
543672
}

0 commit comments

Comments
 (0)