-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBiObservableExample.java
More file actions
152 lines (125 loc) · 5.24 KB
/
BiObservableExample.java
File metadata and controls
152 lines (125 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package rx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.junit.Test;
public class BiObservableExample {
@Test
public void exampleOfUsage() {
// A service for fetching persistent streams of jobs
JobService jobService = new JobService();
// A service for serializing output to subscribed
OutputService outputService = new OutputService();
Observable<JobRequest> jobRequests = jobService.getNewJobRequests("foo");
// Runs a job for each jobRequest and converts to a string of jobResults
Observable<String> jobResults = BiObservable
.generate(jobRequests, (JobRequest jobRequest) -> {
return findJob(jobRequest);})
.map2((JobRequest jobRequest, Job job) -> {
return job.doWork(jobRequest.getParameters());
})
.bimap((JobRequest jobRequest, String results) -> {
return "JobId: " + jobRequest.getId() + "\n" +
"Parameters: " + jobRequest.getParameters().toString() + "\n" +
"Results:\n" + results;
});
// A list of zero or more OutputWriters
Observable<OutputWriter> outputSubscribers = outputService.getOutputSubscribers();
// For each jobResult emitted and each outputSubscriber (cached to prevent resubscribing)
BiObservable.product(jobResults, outputSubscribers.cache())
.doOnNext((String result, OutputWriter writer) -> {
System.out.println("Finished job");
writer.write(result);
})
.subcribe(new BiSubscriber<String, OutputWriter>() {
@Override
public void onNext(String result, OutputWriter writer) {
Objects.requireNonNull(writer);
Objects.requireNonNull(result);
System.out.println("Published job results to subscribed writer: " + writer.getName());
}
@Override
public void onError(Throwable e) {
Objects.requireNonNull(e);
System.out.println("Error");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed pipeline");
}
});
}
public Job findJob(JobRequest jobRequest) {
try {
return jobMap.get(jobRequest.getJobName());
} catch (Exception e) {
throw new RuntimeException("Failed to find job for request id:" + jobRequest.getId(), e);
}
}
@SuppressWarnings("serial")
private final Map<String, Job> jobMap = new HashMap<String, Job>() {{
put("foo", new Job() {
@Override
public String doWork(List<String> parameters) {
return "Okay";
}});
}};
private static class JobRequest {
private final int id;
private final String jobName;
private final List<String> parameters;
public JobRequest(int id, String jobName, List<String> parameters) {
this.id = id;
this.jobName = jobName;
this.parameters = parameters;
}
public int getId() {
return id;
}
public List<String> getParameters() {
return parameters;
}
public String getJobName() {
return jobName;
}
}
private abstract static class Job {
public abstract String doWork(List<String> parameters);
}
private static class JobService {
private Map<String, List<String>> parameterMap = new HashMap<String, List<String>>();
public JobService() {
parameterMap.put("foo", Arrays.asList(new String[]{"-bar", "-qux"}));
}
public Observable<JobRequest> getNewJobRequests(String jobName) {
List<String> standardParameters = getStandardParameters(jobName);
return BiObservable.attach(Observable.range(0, 10), standardParameters)
.bimap((id, defaultParameters) -> new JobRequest(id, jobName, defaultParameters));
}
private List<String> getStandardParameters(String jobName) {
return new ArrayList<String>(parameterMap.get(jobName));
}
}
private static class OutputService {
public Observable<OutputWriter> getOutputSubscribers() {
// this should normally be generated from a pool of output workers or something sophisticated
return Observable.just(new OutputWriter("Pub/Sub Integration"));
}
}
private static class OutputWriter {
private String name;
public OutputWriter(String name) {
this.name = name;
}
public void write(String data) {
System.out.println("Output Writer:\n" + data);
}
public String getName() {
return name;
}
}
}