Skip to content

Commit

Permalink
PDI-19719 - Fixed NPE when pgCopyOut is closing
Browse files Browse the repository at this point in the history
  • Loading branch information
petrprochy committed Feb 10, 2023
1 parent 123cb2c commit 67035b9
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2023 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -30,22 +30,16 @@
//
//

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
Expand All @@ -54,11 +48,15 @@
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.postgresql.PGConnection;
import org.postgresql.copy.PGCopyOutputStream;

import com.google.common.annotations.VisibleForTesting;

import org.postgresql.PGConnection;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* Performs a bulk load to a postgres table.
Expand Down Expand Up @@ -492,11 +490,8 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (PGBulkLoaderMeta) smi;
data = (PGBulkLoaderData) sdi;

try {
pgCopyOut.close();
} catch ( IOException e ) {
logError( "Error while closing the Postgres Output Stream", e.getMessage() );
}
IOUtils.closeQuietly( pgCopyOut,
e -> logError( "Error while closing the Postgres Output Stream", e.getMessage() ) );

if ( data.db != null ) {
data.db.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2017-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2017-2023 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -21,20 +21,6 @@
******************************************************************************/
package org.pentaho.di.trans.steps.pgbulkloader;

import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -58,10 +44,20 @@
import java.io.IOException;
import java.lang.reflect.Field;

import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.pentaho.di.core.row.ValueMetaInterface.TYPE_BOOLEAN;

public class PGBulkLoaderTest {
Expand Down Expand Up @@ -207,6 +203,37 @@ public void writeBooleanToPgOutput() throws Exception {
assertEquals( "false", "0" + Const.CR, out.toString() );
}

/**
* Regression test for PDI-18989
*/
@Test
public void emptyInput() throws Exception {
final PGBulkLoaderMeta meta = initMeta( "field" );
final PGBulkLoaderData data = initData();
final PGBulkLoader loader = spy( this.pgBulkLoader );
doReturn( null ).when( loader ).getRow();

loader.init( meta, data );
assertFalse( "Step finished", loader.processRow( meta, data ) );
loader.dispose( meta, data );
}

/**
* Step does not have defined any output fields -> initialize of copy command failed.
*/
@Test
public void do_copyFailed() throws Exception {
final PGBulkLoaderMeta meta = initMeta();
final PGBulkLoaderData data = initData();
final PGBulkLoader loader = spy( this.pgBulkLoader );
doReturn( new Object[] { "test data" } ).when( loader ).getRow();

loader.init( meta, data );
assertFalse( "Process row failed", loader.processRow( meta, data ) );
assertEquals( "Step has error", 1, loader.getErrors() );
loader.dispose( meta, data );
}

private ByteArrayOutputStream initPGCopyOutputStream() throws IOException, NoSuchFieldException, IllegalAccessException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final PGCopyOutputStream pgCopy = mock( PGCopyOutputStream.class );
Expand All @@ -226,10 +253,10 @@ private RowMeta initRowMeta( String valueName, int type ) throws KettlePluginExc
return rowMeta;
}

private PGBulkLoaderMeta initMeta( String valueName ) throws KettleXMLException {
private PGBulkLoaderMeta initMeta( String... valueNames ) throws KettleXMLException {
final PGBulkLoaderMeta meta = getPgBulkLoaderMock( null );
when( meta.getFieldStream() ).thenReturn( new String[] {valueName} );
when( meta.getDateMask() ).thenReturn( new String[] {""} );
when( meta.getFieldStream() ).thenReturn( valueNames );
when( meta.getDateMask() ).thenReturn( new String[] { "" } );
return meta;
}

Expand Down

0 comments on commit 67035b9

Please sign in to comment.