diff --git a/stream/__init__.py b/stream/__init__.py index 470ad0b..9792464 100644 --- a/stream/__init__.py +++ b/stream/__init__.py @@ -1,2 +1,2 @@ from .stream import Stream -from .intstream import IntStream +from .numbers import NumberStream diff --git a/stream/parallelstream.py b/stream/parallelstream.py new file mode 100644 index 0000000..80b9734 --- /dev/null +++ b/stream/parallelstream.py @@ -0,0 +1,298 @@ +from itertools import tee +from multiprocessing import Process, Queue, RLock +import multiprocessing as mp +import queue + +from threading import Thread, Lock +from time import sleep + +from stream.stream import Stream +from stream.util.iterators import IteratorUtils +from stream.util.optional import Optional +from stream.util.threads import StreamThread + + +class ParallelUtils: + + iterLock = Lock() + + @staticmethod + def splitted(iterable, pre, offset): + index = -1 + try: + while True: + index += 1 + elem = next(iterable) + if(index < pre): + continue + if(index % offset == pre): + yield elem + except: + return + + @staticmethod + def _iterator(iterable): + try: + while True: + yield next(iterable) + except: + return + + @staticmethod + def sameSplit(iterable, count): + return [ParallelUtils._iterator(iterable) for _ in range(count)] + + @staticmethod + def split(iterable, count): + return [ParallelUtils.splitted(it, index, count) for index, it in enumerate(tee(iterable, count))] + + @ staticmethod + def finiteSplit(iterable, count): + elements = list(iterable) + chunks = [[] for _ in range(count)] + chunk_size = int(len(elements) / count) + 1 + + for index, elem in enumerate(elements): + chunk = int(index / chunk_size) + chunks[chunk].append(elem) + + return [Stream(chunk) for chunk in chunks] + + +class ParallelStream(Stream): + + PROCESS = 8 + + def __init__(self, iterable_supplier): + self.__streams = [] + for i in range(self.PROCESS): + self.__streams.append(StreamThread( + Stream(lambda: ParallelUtils.splitted(iterable_supplier(), i, self.PROCESS)))) + + for _stream in self.__streams: + _stream.start() + + def filter(self, predicate): + ''' + Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream. + + :param function predicate: predicate to apply to each element to determine if it should be included + :return: the new stream + ''' + for _stream in self.__streams: + _stream.filter(predicate) + + return self + + def map(self, mapper): + + for _stream in self.__streams: + _stream.map(mapper) + + return self + + def flatMap(self, flatMapper): + + for _stream in self.__streams: + _stream.flatMap(flatMapper) + + return self + + def distinct(self): + + for _stream in self.__streams: + _stream.distinct() + + return self + + def peek(self, consumer): + + for _stream in self.__streams: + _stream.peek(consumer) + + return self + + def forEach(self, function): + + for _stream in self.__streams: + _stream.forEach(function) + + for _stream in self.__streams: + _stream.join() + + return self + + def anyMatch(self, predicate): + + for _stream in self.__streams: + _stream.anyMatch(predicate) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult() + for _stream in self.__streams] + + return Stream(results).anyMatch(lambda x: x) + + def allMatch(self, predicate): + + for _stream in self.__streams: + _stream.allMatch(predicate) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult() + for _stream in self.__streams] + + return Stream(results).allMatch(lambda x: x) + + def noneMatch(self, predicate): + + for _stream in self.__streams: + _stream.noneMatch(predicate) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult() + for _stream in self.__streams] + + return Stream(results).allMatch(lambda x: x) + + def findAny(self): + + for _stream in self.__streams: + _stream.findAny() + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult().get() + for _stream in self.__streams if _stream.getResult().isPresent()] + + return Stream(results).findAny() + + def reduce(self, accumulator, identity=None): + + for _stream in self.__streams: + _stream.reduce(accumulator, identity) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult().get() + for _stream in self.__streams if _stream.getResult().isPresent()] + + return Stream(results).reduce(accumulator, identity) + + def min(self, comparator=None): + + for _stream in self.__streams: + _stream.min(comparator) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult().get() + for _stream in self.__streams if _stream.getResult().isPresent()] + + return Stream(results).min(comparator) + + def max(self, comparator=None): + + for _stream in self.__streams: + _stream.max(comparator) + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult().get() + for _stream in self.__streams if _stream.getResult().isPresent()] + + return Stream(results).max(comparator) + + def sum(self): + + for _stream in self.__streams: + _stream.sum() + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult().get() + for _stream in self.__streams if _stream.getResult().isPresent()] + + return Stream(results).sum() + + def count(self): + + for _stream in self.__streams: + _stream.count() + + for _stream in self.__streams: + _stream.join() + + results = [_stream.getResult() + for _stream in self.__streams if _stream.getResult()] + + res = 0 + for r in results: + res += r + + return res + + def toList(self): + + for _stream in self.__streams: + _stream.toList() + + for _stream in self.__streams: + _stream.join() + + sublists = [_stream.getResult() + for _stream in self.__streams] + + results = [] + for sub in sublists: + results.extend(sub) + return results + + def toSet(self): + + for _stream in self.__streams: + _stream.toSet() + + for _stream in self.__streams: + _stream.join() + + subsets = [_stream.getResult() + for _stream in self.__streams] + + results = set() + for sub in subsets: + for elem in sub: + results.add(elem) + + return results + + def get(self): + return self.__streams + + def terminate(self): + for _stream in self.__streams: + _stream.terminate() + + def __eq__(self, other): + return self.toSet() == set(other) + + def __iter__(self): + + def _iter(iters): + for it in iters: + for elem in it: + yield elem + self.terminate() + + return _iter(self.__streams) diff --git a/stream/stream.py b/stream/stream.py index 3578144..46517da 100644 --- a/stream/stream.py +++ b/stream/stream.py @@ -75,7 +75,7 @@ def iterate(seed, operator): :param UnaryOperator operator: a function to be applied to the previous element to produce a new element :return: a new sequential Stream ''' - return Stream(IteratorUtils.iterate(seed, operator)) + return Stream(lambda: IteratorUtils.iterate(seed, operator)) @staticmethod def generate(supplier): @@ -85,7 +85,7 @@ def generate(supplier): :param Supplier supplier: the Supplier of generated elements :return: a new infinite sequential unordered Stream ''' - return Stream(IteratorUtils.generate(supplier)) + return Stream(lambda: IteratorUtils.generate(supplier)) @staticmethod def concat(*streams): @@ -96,17 +96,20 @@ def concat(*streams): :return: the concatenation of the input streams ''' return Stream(IteratorUtils.concat(*streams)) - - @staticmethod - def booleans(): - return Stream.generate(lambda: random.randint(0, 1) == 1) - """ Normal Methods """ def __init__(self, iterable): + def itS(it): + for elem in it: + yield elem self.iterable = iterable + self.__iterable_supplier = lambda: itS(self.iterable) + + def parallel(self): + from .parallelstream import ParallelStream + return ParallelStream(self.__iterable_supplier) def filter(self, predicate): ''' diff --git a/stream/util/threads.py b/stream/util/threads.py new file mode 100644 index 0000000..2d0115c --- /dev/null +++ b/stream/util/threads.py @@ -0,0 +1,176 @@ +from threading import Thread +import queue + + +class StreamThread(Thread): + + def __init__(self, source): + # Call the Thread class's init function + Thread.__init__(self) + + self._queue = queue.Queue() + self._stream = source + self._result = None + self._toTerminate = False + + def _addEvent(self, function, *args): + self._queue.put((function, args)) + + # Filter + def _filter(self, *args): + self._stream = self._stream.filter(args[0]) + + def filter(self, predicate): + self._addEvent(self._filter, predicate) + + # Map + def _map(self, *args): + self._stream = self._stream.map(args[0]) + + def map(self, mapper): + self._addEvent(self._map, mapper) + + # FlatMap + def _flatMap(self, *args): + self._stream = self._stream.flatMap(args[0]) + + def flatMap(self, flatMapper): + self._addEvent(self._flatMap, flatMapper) + + # Distinct + def _distinct(self, *args): + self._stream = self._stream.distinct() + + def distinct(self): + self._addEvent(self._distinct, None) + + # Sorted + def _sorted(self, *args): + self._stream = self._stream.sorted(args[0]) + + def sorted(self, comparator=None): + self._addEvent(self._sorted, comparator) + + # Peek + def _peek(self, *args): + self._stream = self._stream.peek(args[0]) + + def peek(self, consumer): + self._addEvent(self._peek, consumer) + + # ForEach + def _forEach(self, *args): + self._stream.forEach(args[0]) + self.terminate() + + def forEach(self, function): + self._addEvent(self._forEach, function) + + # AnyMatch + def _anyMatch(self, *args): + self._result = self._stream.anyMatch(args[0]) + self.terminate() + + def anyMatch(self, predicate): + self._addEvent(self._anyMatch, predicate) + + # AllMatch + def _allMatch(self, *args): + self._result = self._stream.allMatch(args[0]) + self.terminate() + + def allMatch(self, predicate): + self._addEvent(self._allMatch, predicate) + + # NoneMatch + def _noneMatch(self, *args): + self._anyMatch(args[0]) + self._result = not self._result + self.terminate() + + def noneMatch(self, predicate): + self._addEvent(self._noneMatch, predicate) + + # Find Any + def _findAny(self, *args): + self._result = self._stream.findAny() + self.terminate() + + def findAny(self): + self._addEvent(self._findAny, None) + + # Reduce + def _reduce(self, *args): + self._result = self._stream.reduce(args[0], args[1]) + self.terminate() + + def reduce(self, accumulator, identity=None): + self._addEvent(self._reduce, *(accumulator, identity), None) + + # Min + def _min(self, *args): + self._result = self._stream.min(args[0]) + self.terminate() + + def min(self, comparator=None): + self._addEvent(self._min, comparator) + + # Max + def _max(self, *args): + self._result = self._stream.max(args[0]) + self.terminate() + + def max(self, comparator=None): + self._addEvent(self._max, comparator) + + # Sum + def _sum(self, *args): + self._result = self._stream.sum() + self.terminate() + + def sum(self): + self._addEvent(self._sum, None) + + # Count + def _count(self, *args): + self._result = self._stream.count() + self.terminate() + + def count(self): + self._addEvent(self._count, None) + + # ToList + def _toList(self, *args): + self._result = self._stream.toList() + self.terminate() + + def toList(self): + self._addEvent(self._toList, None) + + # ToSet + def _toSet(self, *args): + self._result = self._stream.toSet() + self.terminate() + + def toSet(self): + self._addEvent(self._toSet, None) + + def run(self): + while not self._toTerminate: + func, args = self._queue.get() + if args: + func(*args) + else: + func() + + def getResult(self): + return self._result + + def _terminate(self, *args): + self._toTerminate = True + + def terminate(self): + self._addEvent(self._terminate, None) + + def __iter__(self): + return self._stream.__iter__()