Hi friends,
Today I am going to show you, how you can create a SSIS package programmatically in C#.
Before that, let's discuss first problem statement or scenario where we might need this kind of solution.
Problem Statement: I need to extract data from multiple tables in a database and save that extracted data to a separate database for the purpose of read-only use (for example Reporting purpose). We need to do this because we want to avoid load on OLTP system. Requirement is to automate this process, so any time I should be able to create a new table (based on new requirement coming) and load/update data in that table without writing code each time.
Solution: Here I created a table(as per below mentioned table schema) programmatically in database now next step to load data on this table.
CREATE TABLE [dbo].[TestTableDest]
(
[ProductID] [int] IDENTITY(1,1) NOT NULL,
[Name] [varchar](50) NOT NULL,
[ProductNumber] [varchar](50) NOT NULL,
CONSTRAINT [IX_TestTableDest] UNIQUE NONCLUSTERED
(
[ProductID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = ON, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
Now let's create SSIS package to load data to this table programmatically using C#. Requirement to insert new rows (if it is new) or update the existing row (if it is already exists) based on unique key value.
To do this we need below assemble reference to our project.
1. Microsoft.SqlServer.ADONETSrc
Location: C:\Program Files (x86)\Microsoft SQL Server\100\DTS\PipelineComponents\Microsoft.SqlServer.ADONETSrc.dll
2. Microsoft.SqlServer.DTSPipelineWrap
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SqlServer.DTSPipelineWrap.dll
3. Microsoft.SqlServer.DTSRuntimeWrap
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SQLServer.DTSRuntimeWrap.dll
4. Microsoft.SqlServer.ManagedDTS
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SQLServer.ManagedDTS.dll
5. Microsoft.SqlServer.PipelineHost
Location: C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies\Microsoft.SqlServer.PipelineHost.dll
And then below is the code which will do our task as defined here.
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml;
namespace IDS.DynamicSSIS
{
class Program3
{
static void Main(string[] args)
{
Application app = new Application();
Package package = new Package();
#region Add Data Flow Task
Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");
// Set
the name (otherwise it will be a random GUID value)
TaskHost taskHost = dataFlowTask as TaskHost;
taskHost.Name = "Data Flow Task";
// We
need a reference to the InnerObject to add items to the data flow
MainPipe pipeline = taskHost.InnerObject as MainPipe;
#endregion Add Data Flow Task
#region Add connection manager
ConnectionManager connection = package.Connections.Add("OLEDB");
connection.Name = "localhost";
connection.ConnectionString = "Data Source=localhost;Initial
Catalog=TestDB;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto
Translate=False;";
#endregion
Add connection manager
#region Add OLE DB Source Data Flow
// Add
OLEDB Source
IDTSComponentMetaData100 srcComponent =
pipeline.ComponentMetaDataCollection.New();
srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
srcComponent.ValidateExternalMetadata = true;
IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
srcDesignTimeComponent.ProvideComponentProperties();
srcComponent.Name = "OleDb Source";
//
Configure it to read from the given table
srcDesignTimeComponent.SetComponentProperty("AccessMode", 2);// 2 - SQL Command
srcDesignTimeComponent.SetComponentProperty("SqlCommand", "Select * from dbo.TestTableSource");
// Set
the connection manager
srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID =
connection.ID;
//
Retrieve the column metadata
srcDesignTimeComponent.AcquireConnections(null);
srcDesignTimeComponent.ReinitializeMetaData();
srcDesignTimeComponent.ReleaseConnections();
IDTSOutputColumnCollection100 sourceColumns =
srcComponent.OutputCollection[0].OutputColumnCollection;
#endregion Add OLE DB Source Data Flow
#region lookup transform
// Add
transform
IDTSComponentMetaData100 lookupComponent =
pipeline.ComponentMetaDataCollection.New();
lookupComponent.ComponentClassID = "DTSTransform.Lookup";
lookupComponent.Name = "Lookup";
CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
lookupWrapper.ProvideComponentProperties();
//
Connect the source and the transform
IDTSPath100 lookUpPath = pipeline.PathCollection.New();
lookUpPath.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0],
lookupComponent.InputCollection[0]);
// Set
the connection manager
lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID =
connection.ID;
// Cache
Type - Full = 0, Partial = 1, None = 2
lookupWrapper.SetComponentProperty("CacheType", 0);
lookupWrapper.SetComponentProperty("NoMatchBehavior",
1);// 1= Redirect rows to No Match output
lookupWrapper.SetComponentProperty("SqlCommand", "select Name, ProductNumber from
[dbo].[TestTableDest]");
//
initialize metadata
lookupWrapper.AcquireConnections(null);
lookupWrapper.ReinitializeMetaData();
lookupWrapper.ReleaseConnections();
// Mark
the columns we are joining on
IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns =
lookupVirtualInput.VirtualInputColumnCollection;
// Note:
join columns should be marked as READONLY
var joinColumns = new string[] { "Name" };
foreach (string columnName in joinColumns)
{
IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID,
lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
}
// First
output is the Match output
IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];
//
Second output is the Un-Match output
IDTSOutput100 lookupNoMatchOutput =
lookupComponent.OutputCollection[1];
#endregion lookup transform
#region Add OLE DB Destination Data Flow for No Match Output
// Add
OLEDB Source
IDTSComponentMetaData100 destNoMatchComponent =
pipeline.ComponentMetaDataCollection.New();
destNoMatchComponent.ComponentClassID
= "DTSAdapter.OleDbDestination";
destNoMatchComponent.ValidateExternalMetadata = true;
IDTSDesigntimeComponent100 destNoMatchDesignTimeComponent =
destNoMatchComponent.Instantiate();
destNoMatchDesignTimeComponent.ProvideComponentProperties();
destNoMatchComponent.Name = "OleDb Destination";
//
Configure it to read from the given table
destNoMatchDesignTimeComponent.SetComponentProperty("AccessMode", 3);// 3 - OpenRowset
destNoMatchDesignTimeComponent.SetComponentProperty("OpenRowset", "[dbo].[TestTableDest]");
// Set
the connection manager
destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
destNoMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID
= connection.ID;
//
Retrieve the column metadata
destNoMatchDesignTimeComponent.AcquireConnections(null);
destNoMatchDesignTimeComponent.ReinitializeMetaData();
destNoMatchDesignTimeComponent.ReleaseConnections();
#endregion Add OLE DB Destination Data Flow for No Match Output
#region Add OLE DB Command Data Flow for Matching Output
// Add
OLEDB Source
IDTSComponentMetaData100 destMatchComponent =
pipeline.ComponentMetaDataCollection.New();
destMatchComponent.ComponentClassID
= "DTSTransform.OLEDBCommand";
destMatchComponent.ValidateExternalMetadata =
true;
IDTSDesigntimeComponent100 destMatchDesignTimeComponent =
destMatchComponent.Instantiate();
destMatchDesignTimeComponent.ProvideComponentProperties();
destMatchComponent.Name = "OleDb Command";
//
Configure it to read from the given table
destMatchDesignTimeComponent.SetComponentProperty("CommandTimeout",
0);
destMatchDesignTimeComponent.SetComponentProperty("SqlCommand", "UPDATE [dbo].[TestTableDest] SET [ProductNumber] = ?
WHERE [Name] = ?");
// Set
the connection manager
destMatchComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
destMatchComponent.RuntimeConnectionCollection[0].ConnectionManagerID
= connection.ID;
//
Retrieve the column metadata
destMatchDesignTimeComponent.AcquireConnections(null);
destMatchDesignTimeComponent.ReinitializeMetaData();
destMatchDesignTimeComponent.ReleaseConnections();
#endregion Add OLE DB Command Data Flow for No Match Output
#region Connect source and destination for No Match Outputs
IDTSPath100 noMatchPath = pipeline.PathCollection.New();
noMatchPath.AttachPathAndPropagateNotifications(lookupNoMatchOutput,
destNoMatchComponent.InputCollection[0]);
//
Configure the destination
IDTSInput100 destNoMatchInput = destNoMatchComponent.InputCollection[0];
IDTSVirtualInput100 destNoMatchVirInput = destNoMatchInput.GetVirtualInput();
IDTSInputColumnCollection100 destNoMatchInputCols =
destNoMatchInput.InputColumnCollection;
IDTSExternalMetadataColumnCollection100 destNoMatchExtCols =
destNoMatchInput.ExternalMetadataColumnCollection;
// The
OLEDB destination requires you to hook up the external columns
foreach (IDTSOutputColumn100 outputCol in sourceColumns)
{
//
Get the external column id
IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destNoMatchExtCols[outputCol.Name];
if (extCol != null)
{
//
Create an input column from an output col of previous component.
destNoMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
IDTSInputColumn100
inputCol = destNoMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
if (inputCol != null)
{
// map the input column with an external metadata column
destNoMatchDesignTimeComponent.MapInputColumn(destNoMatchInput.ID,
inputCol.ID, extCol.ID);
}
}
}
#endregion Connect source and destination for No Match Outputs
#region Connect source and destination for Matching Outputs
IDTSPath100 matchPath = pipeline.PathCollection.New();
matchPath.AttachPathAndPropagateNotifications(lookupMatchOutput,
destMatchComponent.InputCollection[0]);
//
Configure the destination
IDTSInput100 destMatchInput = destMatchComponent.InputCollection[0];
IDTSVirtualInput100 destMatchVirInput = destMatchInput.GetVirtualInput();
IDTSInputColumnCollection100 destMatchInputCols =
destMatchInput.InputColumnCollection;
IDTSExternalMetadataColumnCollection100 destMatchExtCols =
destMatchInput.ExternalMetadataColumnCollection;
// The
OLEDB destination requires you to hook up the external columns
Dictionary<string, string> parameters = new Dictionary<string, string>();
parameters.Add("Param_0", "ProductNumber");
parameters.Add("Param_1", "Name");
foreach (KeyValuePair<string, string> paramValue in parameters)
{
//
Get the external column id
IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destMatchExtCols[paramValue.Key];
if (extCol != null)
{
foreach (IDTSOutputColumn100
outputCol in
sourceColumns)
{
if (outputCol.Name ==
paramValue.Value)
{
// Create an input column from an output col of previous
component.
destMatchVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);
IDTSInputColumn100
inputCol = destMatchInputCols.GetInputColumnByLineageID(outputCol.ID);
if (inputCol != null)
{
// map the input column with an external metadata column
destMatchDesignTimeComponent.MapInputColumn(destMatchInput.ID,
inputCol.ID, extCol.ID);
}
}
}
}
}
#endregion Connect source and destination for No Match Outputs
#region save package as xml
string packageXML = @"C:\ORCC\POC\pkg1.dtsx";
XmlDocument myPkgDocument = new XmlDocument();
package.SaveToXML(ref myPkgDocument, null, null);
package.SaveToXML(ref myPkgDocument, null, null);
app.SaveToXml(packageXML, package, null);
#endregion save package as xml
package.Execute();
Console.WriteLine("Completed");
Console.ReadLine();
}
}
}