Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,11 @@ public static void shutDownExistingCluster() {
* completed checkpoint's.
*/
@Test(timeout = 60000)
public void testMultiRegionFailover() {
try {
JobGraph jobGraph = createJobGraph();
ClusterClient<?> client = cluster.getClusterClient();
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
verifyAfterJobExecuted();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
public void testMultiRegionFailover() throws Exception {
JobGraph jobGraph = createJobGraph();
ClusterClient<?> client = cluster.getClusterClient();
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
verifyAfterJobExecuted();
}

private void verifyAfterJobExecuted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Integration test for the {@link CheckpointListener} interface. The test ensures that {@link
Expand Down Expand Up @@ -83,74 +82,67 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBaseJUnit4 {
* </pre>
*/
@Test
public void testProgram() {
try {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
assertEquals("test setup broken", PARALLELISM, env.getParallelism());
public void testProgram() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
assertEquals("test setup broken", PARALLELISM, env.getParallelism());

env.enableCheckpointing(500);
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);
env.enableCheckpointing(500);
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);

final int numElements = 10000;
final int numTaskTotal = PARALLELISM * 5;
final int numElements = 10000;
final int numTaskTotal = PARALLELISM * 5;

DataStream<Long> stream =
env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
DataStream<Long> stream =
env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));

stream
// -------------- first vertex, chained to the src ----------------
.filter(new LongRichFilterFunction())
stream
// -------------- first vertex, chained to the src ----------------
.filter(new LongRichFilterFunction())

// -------------- second vertex, applying the co-map ----------------
.connect(stream)
.flatMap(new LeftIdentityCoRichFlatMapFunction())
// -------------- second vertex, applying the co-map ----------------
.connect(stream)
.flatMap(new LeftIdentityCoRichFlatMapFunction())

// -------------- third vertex - the stateful one that also fails
// ----------------
.map(new IdentityMapFunction())
.startNewChain()
// -------------- third vertex - the stateful one that also fails
// ----------------
.map(new IdentityMapFunction())
.startNewChain()

// -------------- fourth vertex - reducer and the sink ----------------
.keyBy(x -> x.f0)
.reduce(new OnceFailingReducer(numElements))
.sinkTo(new DiscardingSink<>());
// -------------- fourth vertex - reducer and the sink ----------------
.keyBy(x -> x.f0)
.reduce(new OnceFailingReducer(numElements))
.sinkTo(new DiscardingSink<>());

env.execute();
env.execute();

final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
assertNotEquals(0L, failureCheckpointID);
final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
assertNotEquals(0L, failureCheckpointID);

List<List<Long>[]> allLists =
Arrays.asList(
GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
LongRichFilterFunction.COMPLETED_CHECKPOINTS,
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
IdentityMapFunction.COMPLETED_CHECKPOINTS,
OnceFailingReducer.COMPLETED_CHECKPOINTS);
List<List<Long>[]> allLists =
Arrays.asList(
GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
LongRichFilterFunction.COMPLETED_CHECKPOINTS,
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
IdentityMapFunction.COMPLETED_CHECKPOINTS,
OnceFailingReducer.COMPLETED_CHECKPOINTS);

for (List<Long>[] parallelNotifications : allLists) {
for (List<Long> notifications : parallelNotifications) {
for (List<Long>[] parallelNotifications : allLists) {
for (List<Long> notifications : parallelNotifications) {

assertTrue(
"No checkpoint notification was received.", notifications.size() > 0);
assertTrue("No checkpoint notification was received.", notifications.size() > 0);

assertFalse(
"Failure checkpoint was marked as completed.",
notifications.contains(failureCheckpointID));
assertFalse(
"Failure checkpoint was marked as completed.",
notifications.contains(failureCheckpointID));

assertFalse(
"No checkpoint received after failure.",
notifications.get(notifications.size() - 1) == failureCheckpointID);
assertFalse(
"No checkpoint received after failure.",
notifications.get(notifications.size() - 1) == failureCheckpointID);

assertTrue(
"Checkpoint notification was received multiple times",
notifications.size() == new HashSet<Long>(notifications).size());
}
assertTrue(
"Checkpoint notification was received multiple times",
notifications.size() == new HashSet<Long>(notifications).size());
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,22 @@ public void shutDownExistingCluster() {
*/
@Test
public void runCheckpointedProgram() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);

testProgram(env);

JobGraph jobGraph = env.getStreamGraph().getJobGraph();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);

testProgram(env);

JobGraph jobGraph = env.getStreamGraph().getJobGraph();
try {
submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
} catch (Exception e) {
Assert.assertTrue(
ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
}

postSubmit();
submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
}

postSubmit();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,10 @@ public void testSplitComparison() {
Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
}

@Test
@Test(expected = IllegalArgumentException.class)
public void testIllegalArgument() {
try {
new TimestampedFileInputSplit(
-10, 2, new Path("test"), 0, 100, null); // invalid modification time
} catch (Exception e) {
if (!(e instanceof IllegalArgumentException)) {
Assert.fail(e.getMessage());
}
}
new TimestampedFileInputSplit(
-10, 2, new Path("test"), 0, 100, null); // invalid modification time
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -67,32 +66,26 @@ public void before() {
}

@Test(timeout = 60_000)
public void testLocalExecutorWithWordCount() throws InterruptedException {
try {
// set up the files
File inFile = File.createTempFile("wctext", ".in");
File outFile = File.createTempFile("wctext", ".out");
inFile.deleteOnExit();
outFile.deleteOnExit();

try (FileWriter fw = new FileWriter(inFile)) {
fw.write(WordCountData.TEXT);
}

final Configuration config = new Configuration();
config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.set(DeploymentOptions.ATTACHED, true);

StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism);
JobClient jobClient =
executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader())
.get();
jobClient.getJobExecutionResult().get();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
public void testLocalExecutorWithWordCount() throws Exception {
// set up the files
File inFile = File.createTempFile("wctext", ".in");
File outFile = File.createTempFile("wctext", ".out");
inFile.deleteOnExit();
outFile.deleteOnExit();

try (FileWriter fw = new FileWriter(inFile)) {
fw.write(WordCountData.TEXT);
}

final Configuration config = new Configuration();
config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
config.set(DeploymentOptions.ATTACHED, true);

StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism);
JobClient jobClient =
executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader()).get();
jobClient.getJobExecutionResult().get();

assertThat(miniCluster.isRunning(), is(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;

import static org.junit.Assert.fail;

/**
* Tests for non rich DataSource and DataSink input output formats being correctly used at runtime.
*/
Expand All @@ -38,12 +36,7 @@ protected void testProgram() throws Exception {
TestNonRichOutputFormat output = new TestNonRichOutputFormat();
env.createInput(new TestNonRichInputFormat())
.addSink(new OutputFormatSinkFunction<>(output));
try {
env.execute();
} catch (Exception e) {
// we didn't break anything by making everything rich.
e.printStackTrace();
fail(e.getMessage());
}
env.execute();
// we didn't break anything by making everything rich.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Test for proper error messages in case user-defined serialization is broken and detected in the
Expand All @@ -67,7 +66,7 @@ public static Configuration getConfiguration() {
}

@Test
public void testIncorrectSerializer1() {
public void testIncorrectSerializer1() throws Exception {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
Expand All @@ -93,14 +92,11 @@ public ConsumesTooMuch map(Long value) throws Exception {
.getMessage()
.contains("broken serialization."))
.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testIncorrectSerializer2() {
public void testIncorrectSerializer2() throws Exception {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
Expand All @@ -126,14 +122,11 @@ public ConsumesTooMuchSpanning map(Long value) throws Exception {
.getMessage()
.contains("broken serialization."))
.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testIncorrectSerializer3() {
public void testIncorrectSerializer3() throws Exception {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
Expand All @@ -159,14 +152,11 @@ public ConsumesTooLittle map(Long value) throws Exception {
.getMessage()
.contains("broken serialization."))
.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testIncorrectSerializer4() {
public void testIncorrectSerializer4() throws Exception {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
Expand All @@ -192,9 +182,6 @@ public ConsumesTooLittleSpanning map(Long value) throws Exception {
.getMessage()
.contains("broken serialization."))
.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

Expand Down
Loading